ant_quic/high_level/
connection.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    any::Any,
10    fmt,
11    future::Future,
12    io,
13    net::{IpAddr, SocketAddr},
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27    ConnectionEvent,
28    mutex::Mutex,
29    recv_stream::RecvStream,
30    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31    send_stream::SendStream,
32    udp_transmit,
33};
34use crate::{
35    ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36    Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42    conn: Option<ConnectionRef>,
43    connected: oneshot::Receiver<bool>,
44    handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48    pub(crate) fn new(
49        handle: ConnectionHandle,
50        conn: crate::Connection,
51        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53        socket: Arc<dyn AsyncUdpSocket>,
54        runtime: Arc<dyn Runtime>,
55    ) -> Self {
56        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57        let (on_connected_send, on_connected_recv) = oneshot::channel();
58        let conn = ConnectionRef::new(
59            handle,
60            conn,
61            endpoint_events,
62            conn_events,
63            on_handshake_data_send,
64            on_connected_send,
65            socket,
66            runtime.clone(),
67        );
68
69        let driver = ConnectionDriver(conn.clone());
70        runtime.spawn(Box::pin(
71            async {
72                if let Err(e) = driver.await {
73                    tracing::error!("I/O error: {e}");
74                }
75            }
76            .instrument(Span::current()),
77        ));
78
79        Self {
80            conn: Some(conn),
81            connected: on_connected_recv,
82            handshake_data_ready: Some(on_handshake_data_recv),
83        }
84    }
85
86    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87    ///
88    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89    /// If so, the returned [`Connection`] can be used to send application data without waiting for
90    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92    /// complete, at which point subsequently opened streams and written data will have full
93    /// cryptographic protection.
94    ///
95    /// ## Outgoing
96    ///
97    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103    /// If it was rejected, the existence of streams opened and other application data sent prior
104    /// to the handshake completing will not be conveyed to the remote application, and local
105    /// operations on them will return `ZeroRttRejected` errors.
106    ///
107    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108    /// relevant resumption state to be stored in the server, which servers may limit or lose for
109    /// various reasons including not persisting resumption state across server restarts.
110    ///
111    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112    /// implementation's docs for 0-RTT pitfalls.
113    ///
114    /// ## Incoming
115    ///
116    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118    ///
119    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120    /// implementation's docs for 0-RTT pitfalls.
121    ///
122    /// ## Security
123    ///
124    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125    /// replay attacks, and should therefore never invoke non-idempotent operations.
126    ///
127    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128    /// before TLS client authentication has occurred, and should therefore not be used to send
129    /// data for which client authentication is being used.
130    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132        // have to release it explicitly before returning `self` by value.
133        let conn = match self.conn.as_mut() {
134            Some(conn) => conn.state.lock("into_0rtt"),
135            None => {
136                return Err(self);
137            }
138        };
139
140        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141        drop(conn);
142
143        if is_ok {
144            match self.conn.take() {
145                Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146                None => {
147                    tracing::error!("Connection state missing during 0-RTT acceptance");
148                    Err(self)
149                }
150            }
151        } else {
152            Err(self)
153        }
154    }
155
156    /// Parameters negotiated during the handshake
157    ///
158    /// The dynamic type returned is determined by the configured
159    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160    /// be [`downcast`](Box::downcast) to a
161    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165        // simple.
166        if let Some(x) = self.handshake_data_ready.take() {
167            let _ = x.await;
168        }
169        let conn = self.conn.as_ref().ok_or_else(|| {
170            tracing::error!("Connection state missing while retrieving handshake data");
171            ConnectionError::LocallyClosed
172        })?;
173        let inner = conn.state.lock("handshake");
174        inner
175            .inner
176            .crypto_session()
177            .handshake_data()
178            .ok_or_else(|| {
179                inner.error.clone().unwrap_or_else(|| {
180                    error!("Spurious handshake data ready notification with no error");
181                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182                        "Spurious handshake notification".to_string(),
183                    ))
184                })
185            })
186    }
187
188    /// The local IP address which was used when the peer established
189    /// the connection
190    ///
191    /// This can be different from the address the endpoint is bound to, in case
192    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193    ///
194    /// This will return `None` for clients, or when the platform does not expose this
195    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196    /// supported platforms when using quinn_udp for I/O, which is the default.
197    ///
198    /// Will panic if called after `poll` has returned `Ready`.
199    pub fn local_ip(&self) -> Option<IpAddr> {
200        let conn = self.conn.as_ref()?;
201        let inner = conn.state.lock("local_ip");
202
203        inner.inner.local_ip()
204    }
205
206    /// The peer's UDP address
207    ///
208    /// Returns an error if called after `poll` has returned `Ready`.
209    pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210        let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211            error!("Connection used after yielding Ready");
212            ConnectionError::LocallyClosed
213        })?;
214        Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215    }
216}
217
218impl Future for Connecting {
219    type Output = Result<Connection, ConnectionError>;
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221        Pin::new(&mut self.connected).poll(cx).map(|_| {
222            let conn = self.conn.take().ok_or_else(|| {
223                error!("Connection not available when connecting future resolves");
224                ConnectionError::LocallyClosed
225            })?;
226            let inner = conn.state.lock("connecting");
227            if inner.connected {
228                drop(inner);
229                Ok(Connection(conn))
230            } else {
231                Err(inner.error.clone().unwrap_or_else(|| {
232                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233                        "connection failed without error".to_string(),
234                    ))
235                }))
236            }
237        })
238    }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248    type Output = bool;
249    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251    }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269    type Output = Result<(), io::Error>;
270
271    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272        let conn = &mut *self.0.state.lock("poll");
273
274        let span = debug_span!("drive", id = conn.handle.0);
275        let _guard = span.enter();
276
277        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278            conn.terminate(e, &self.0.shared);
279            return Poll::Ready(Ok(()));
280        }
281        let mut keep_going = conn.drive_transmit(cx)?;
282        // If a timer expires, there might be more to transmit. When we transmit something, we
283        // might need to reset a timer. Hence, we must loop until neither happens.
284        keep_going |= conn.drive_timer(cx);
285        conn.forward_endpoint_events();
286        conn.forward_app_events(&self.0.shared);
287
288        // Kick off automatic channel binding once connected, if configured
289        if conn.connected && !conn.binding_started {
290            if let Some(rt) = crate::trust::global_runtime() {
291                // Delay NEW_TOKEN until binding completes
292                conn.inner.set_delay_new_token_until_binding(true);
293
294                let hl_conn_server = Connection(self.0.clone());
295                let hl_conn_client = hl_conn_server.clone();
296                let store = rt.store.clone();
297                let policy = rt.policy.clone();
298                let signer = rt.local_secret_key.clone();
299                let spki = rt.local_spki.clone();
300                let runtime = conn.runtime.clone();
301
302                if conn.inner.side().is_server() {
303                    runtime.spawn(Box::pin(async move {
304                        match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
305                            .await
306                        {
307                            Ok(peer) => {
308                                hl_conn_server
309                                    .0
310                                    .state
311                                    .lock("set peer")
312                                    .inner
313                                    .set_token_binding_peer_id(peer);
314                                hl_conn_server
315                                    .0
316                                    .state
317                                    .lock("allow tokens")
318                                    .inner
319                                    .set_delay_new_token_until_binding(false);
320                            }
321                            Err(_e) => {
322                                hl_conn_server.close(0u32.into(), b"channel binding failed");
323                            }
324                        }
325                    }));
326                }
327
328                if conn.inner.side().is_client() {
329                    runtime.spawn(Box::pin(async move {
330                        if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
331                            let _ =
332                                crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
333                                    .await;
334                        }
335                    }));
336                }
337
338                conn.binding_started = true;
339            }
340        }
341
342        if !conn.inner.is_drained() {
343            if keep_going {
344                // If the connection hasn't processed all tasks, schedule it again
345                cx.waker().wake_by_ref();
346            } else {
347                conn.driver = Some(cx.waker().clone());
348            }
349            return Poll::Pending;
350        }
351        if conn.error.is_none() {
352            unreachable!("drained connections always have an error");
353        }
354        Poll::Ready(Ok(()))
355    }
356}
357
358/// A QUIC connection.
359///
360/// If all references to a connection (including every clone of the `Connection` handle, streams of
361/// incoming streams, and the various stream types) have been dropped, then the connection will be
362/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
363/// connection explicitly by calling [`Connection::close()`].
364///
365/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
366/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
367/// application. [`Connection::close()`] describes in more detail how to gracefully close a
368/// connection without losing application data.
369///
370/// May be cloned to obtain another handle to the same connection.
371///
372/// [`Connection::close()`]: Connection::close
373#[derive(Debug, Clone)]
374pub struct Connection(ConnectionRef);
375
376impl Connection {
377    /// Initiate a new outgoing unidirectional stream.
378    ///
379    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
380    /// consequence, the peer won't be notified that a stream has been opened until the stream is
381    /// actually used.
382    pub fn open_uni(&self) -> OpenUni<'_> {
383        OpenUni {
384            conn: &self.0,
385            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
386        }
387    }
388
389    /// Initiate a new outgoing bidirectional stream.
390    ///
391    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
392    /// consequence, the peer won't be notified that a stream has been opened until the stream is
393    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
394    /// anything to [`SendStream`] will never succeed.
395    ///
396    /// [`open_bi()`]: Self::open_bi
397    /// [`SendStream`]: crate::SendStream
398    /// [`RecvStream`]: crate::RecvStream
399    pub fn open_bi(&self) -> OpenBi<'_> {
400        OpenBi {
401            conn: &self.0,
402            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
403        }
404    }
405
406    /// Accept the next incoming uni-directional stream
407    pub fn accept_uni(&self) -> AcceptUni<'_> {
408        AcceptUni {
409            conn: &self.0,
410            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
411        }
412    }
413
414    /// Accept the next incoming bidirectional stream
415    ///
416    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
417    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
418    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
419    ///
420    /// [`accept_bi()`]: Self::accept_bi
421    /// [`open_bi()`]: Self::open_bi
422    /// [`SendStream`]: crate::SendStream
423    /// [`RecvStream`]: crate::RecvStream
424    pub fn accept_bi(&self) -> AcceptBi<'_> {
425        AcceptBi {
426            conn: &self.0,
427            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
428        }
429    }
430
431    /// Receive an application datagram
432    pub fn read_datagram(&self) -> ReadDatagram<'_> {
433        ReadDatagram {
434            conn: &self.0,
435            notify: self.0.shared.datagram_received.notified(),
436        }
437    }
438
439    /// Wait for the connection to be closed for any reason
440    ///
441    /// Despite the return type's name, closed connections are often not an error condition at the
442    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
443    /// and [`ConnectionError::ApplicationClosed`].
444    pub async fn closed(&self) -> ConnectionError {
445        {
446            let conn = self.0.state.lock("closed");
447            if let Some(error) = conn.error.as_ref() {
448                return error.clone();
449            }
450            // Construct the future while the lock is held to ensure we can't miss a wakeup if
451            // the `Notify` is signaled immediately after we release the lock. `await` it after
452            // the lock guard is out of scope.
453            self.0.shared.closed.notified()
454        }
455        .await;
456        self.0
457            .state
458            .lock("closed")
459            .error
460            .as_ref()
461            .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
462            .clone()
463    }
464
465    /// If the connection is closed, the reason why.
466    ///
467    /// Returns `None` if the connection is still open.
468    pub fn close_reason(&self) -> Option<ConnectionError> {
469        self.0.state.lock("close_reason").error.clone()
470    }
471
472    /// Close the connection immediately.
473    ///
474    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
475    /// more data is sent to the peer and the peer may drop buffered data upon receiving
476    /// the CONNECTION_CLOSE frame.
477    ///
478    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
479    ///
480    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
481    /// is preserved in full, it should be kept under 1KiB.
482    ///
483    /// # Gracefully closing a connection
484    ///
485    /// Only the peer last receiving application data can be certain that all data is
486    /// delivered. The only reliable action it can then take is to close the connection,
487    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
488    /// frame is very likely if both endpoints stay online long enough, and
489    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
490    /// remote peer will time out the connection, provided that the idle timeout is not
491    /// disabled.
492    ///
493    /// The sending side can not guarantee all stream data is delivered to the remote
494    /// application. It only knows the data is delivered to the QUIC stack of the remote
495    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
496    /// [`close()`] the remote endpoint may drop any data it received but is as yet
497    /// undelivered to the application, including data that was acknowledged as received to
498    /// the local endpoint.
499    ///
500    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
501    /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
502    /// [`close()`]: Connection::close
503    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
504        let conn = &mut *self.0.state.lock("close");
505        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
506    }
507
508    /// Transmit `data` as an unreliable, unordered application datagram
509    ///
510    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
511    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
512    /// dictated by the peer.
513    ///
514    /// Previously queued datagrams which are still unsent may be discarded to make space for this
515    /// datagram, in order of oldest to newest.
516    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
517        let conn = &mut *self.0.state.lock("send_datagram");
518        if let Some(ref x) = conn.error {
519            return Err(SendDatagramError::ConnectionLost(x.clone()));
520        }
521        use crate::SendDatagramError::*;
522        match conn.inner.datagrams().send(data, true) {
523            Ok(()) => {
524                conn.wake();
525                Ok(())
526            }
527            Err(e) => Err(match e {
528                Blocked(..) => unreachable!(),
529                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
530                Disabled => SendDatagramError::Disabled,
531                TooLarge => SendDatagramError::TooLarge,
532            }),
533        }
534    }
535
536    /// Transmit `data` as an unreliable, unordered application datagram
537    ///
538    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
539    /// conditions, which effectively prioritizes old datagrams over new datagrams.
540    ///
541    /// See [`send_datagram()`] for details.
542    ///
543    /// [`send_datagram()`]: Connection::send_datagram
544    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
545        SendDatagram {
546            conn: &self.0,
547            data: Some(data),
548            notify: self.0.shared.datagrams_unblocked.notified(),
549        }
550    }
551
552    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
553    ///
554    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
555    ///
556    /// This may change over the lifetime of a connection according to variation in the path MTU
557    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
558    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
559    ///
560    /// Not necessarily the maximum size of received datagrams.
561    ///
562    /// [`send_datagram()`]: Connection::send_datagram
563    pub fn max_datagram_size(&self) -> Option<usize> {
564        self.0
565            .state
566            .lock("max_datagram_size")
567            .inner
568            .datagrams()
569            .max_size()
570    }
571
572    /// Bytes available in the outgoing datagram buffer
573    ///
574    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
575    /// at most this size is guaranteed not to cause older datagrams to be dropped.
576    pub fn datagram_send_buffer_space(&self) -> usize {
577        self.0
578            .state
579            .lock("datagram_send_buffer_space")
580            .inner
581            .datagrams()
582            .send_buffer_space()
583    }
584
585    /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
586    pub fn send_nat_address_advertisement(
587        &self,
588        address: SocketAddr,
589        priority: u32,
590    ) -> Result<u64, crate::ConnectionError> {
591        let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
592        conn.inner.send_nat_address_advertisement(address, priority)
593    }
594
595    /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
596    pub fn send_nat_punch_coordination(
597        &self,
598        paired_with_sequence_number: u64,
599        address: SocketAddr,
600        round: u32,
601    ) -> Result<(), crate::ConnectionError> {
602        let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
603        conn.inner
604            .send_nat_punch_coordination(paired_with_sequence_number, address, round)
605    }
606
607    /// The side of the connection (client or server)
608    pub fn side(&self) -> Side {
609        self.0.state.lock("side").inner.side()
610    }
611
612    /// The peer's UDP address
613    ///
614    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
615    /// switching to a cellular internet connection.
616    pub fn remote_address(&self) -> SocketAddr {
617        self.0.state.lock("remote_address").inner.remote_address()
618    }
619
620    /// The external/reflexive address observed by the remote peer
621    ///
622    /// Returns the address that the remote peer has observed for this connection,
623    /// as reported via OBSERVED_ADDRESS frames. This is useful for NAT traversal
624    /// to learn the public address of this endpoint as seen by others.
625    ///
626    /// Returns `None` if:
627    /// - Address discovery is not enabled
628    /// - No OBSERVED_ADDRESS frame has been received yet
629    /// - The connection hasn't completed the handshake
630    pub fn observed_address(&self) -> Option<SocketAddr> {
631        self.0
632            .state
633            .lock("observed_address")
634            .inner
635            .observed_address()
636    }
637
638    /// The local IP address which was used when the peer established
639    /// the connection
640    ///
641    /// This can be different from the address the endpoint is bound to, in case
642    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
643    ///
644    /// This will return `None` for clients, or when the platform does not expose this
645    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
646    /// supported platforms when using quinn_udp for I/O, which is the default.
647    pub fn local_ip(&self) -> Option<IpAddr> {
648        self.0.state.lock("local_ip").inner.local_ip()
649    }
650
651    /// Current best estimate of this connection's latency (round-trip-time)
652    pub fn rtt(&self) -> Duration {
653        self.0.state.lock("rtt").inner.rtt()
654    }
655
656    /// Returns connection statistics
657    pub fn stats(&self) -> ConnectionStats {
658        self.0.state.lock("stats").inner.stats()
659    }
660
661    /// Current state of the congestion control algorithm, for debugging purposes
662    pub fn congestion_state(&self) -> Box<dyn Controller> {
663        self.0
664            .state
665            .lock("congestion_state")
666            .inner
667            .congestion_state()
668            .clone_box()
669    }
670
671    /// Parameters negotiated during the handshake
672    ///
673    /// Guaranteed to return `Some` on fully established connections or after
674    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
675    /// the returned value.
676    ///
677    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
678    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
679        self.0
680            .state
681            .lock("handshake_data")
682            .inner
683            .crypto_session()
684            .handshake_data()
685    }
686
687    /// Cryptographic identity of the peer
688    ///
689    /// The dynamic type returned is determined by the configured
690    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
691    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
692    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
693        self.0
694            .state
695            .lock("peer_identity")
696            .inner
697            .crypto_session()
698            .peer_identity()
699    }
700
701    /// A stable identifier for this connection
702    ///
703    /// Peer addresses and connection IDs can change, but this value will remain
704    /// fixed for the lifetime of the connection.
705    pub fn stable_id(&self) -> usize {
706        self.0.stable_id()
707    }
708
709    /// Returns true if this connection negotiated post-quantum settings.
710    ///
711    /// This reflects either explicit PQC algorithms advertised via transport
712    /// parameters or in-band detection from handshake CRYPTO frames.
713    pub fn is_pqc(&self) -> bool {
714        let state = self.0.state.lock("is_pqc");
715        state.inner.is_pqc()
716    }
717
718    /// Debug-only hint: returns true when the underlying TLS provider was
719    /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
720    /// for tests and does not itself guarantee group enforcement.
721    pub fn debug_kem_only(&self) -> bool {
722        crate::crypto::rustls::debug_kem_only_enabled()
723    }
724
725    /// Update traffic keys spontaneously
726    ///
727    /// This primarily exists for testing purposes.
728    pub fn force_key_update(&self) {
729        self.0
730            .state
731            .lock("force_key_update")
732            .inner
733            .force_key_update()
734    }
735
736    /// Derive keying material from this connection's TLS session secrets.
737    ///
738    /// When both peers call this method with the same `label` and `context`
739    /// arguments and `output` buffers of equal length, they will get the
740    /// same sequence of bytes in `output`. These bytes are cryptographically
741    /// strong and pseudorandom, and are suitable for use as keying material.
742    ///
743    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
744    pub fn export_keying_material(
745        &self,
746        output: &mut [u8],
747        label: &[u8],
748        context: &[u8],
749    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
750        self.0
751            .state
752            .lock("export_keying_material")
753            .inner
754            .crypto_session()
755            .export_keying_material(output, label, context)
756    }
757
758    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
759    ///
760    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
761    /// `count`s increase both minimum and worst-case memory consumption.
762    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
763        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
764        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
765        // May need to send MAX_STREAMS to make progress
766        conn.wake();
767    }
768
769    /// See [`crate::TransportConfig::receive_window()`]
770    pub fn set_receive_window(&self, receive_window: VarInt) {
771        let mut conn = self.0.state.lock("set_receive_window");
772        conn.inner.set_receive_window(receive_window);
773        conn.wake();
774    }
775
776    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
777    ///
778    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
779    /// `count`s increase both minimum and worst-case memory consumption.
780    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
781        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
782        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
783        // May need to send MAX_STREAMS to make progress
784        conn.wake();
785    }
786
787    /// Set up qlog for this connection.
788    #[cfg(feature = "__qlog")]
789    pub fn set_qlog(
790        &mut self,
791        writer: Box<dyn std::io::Write + Send + Sync>,
792        title: Option<String>,
793        description: Option<String>,
794    ) {
795        let mut state = self.0.state.lock("__qlog");
796        state
797            .inner
798            .set_qlog(writer, title, description, Instant::now());
799    }
800}
801
802pin_project! {
803    /// Future produced by [`Connection::open_uni`]
804    pub struct OpenUni<'a> {
805        conn: &'a ConnectionRef,
806        #[pin]
807        notify: Notified<'a>,
808    }
809}
810
811impl Future for OpenUni<'_> {
812    type Output = Result<SendStream, ConnectionError>;
813    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
814        let this = self.project();
815        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
816        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
817    }
818}
819
820pin_project! {
821    /// Future produced by [`Connection::open_bi`]
822    pub struct OpenBi<'a> {
823        conn: &'a ConnectionRef,
824        #[pin]
825        notify: Notified<'a>,
826    }
827}
828
829impl Future for OpenBi<'_> {
830    type Output = Result<(SendStream, RecvStream), ConnectionError>;
831    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
832        let this = self.project();
833        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
834
835        Poll::Ready(Ok((
836            SendStream::new(conn.clone(), id, is_0rtt),
837            RecvStream::new(conn, id, is_0rtt),
838        )))
839    }
840}
841
842fn poll_open<'a>(
843    ctx: &mut Context<'_>,
844    conn: &'a ConnectionRef,
845    mut notify: Pin<&mut Notified<'a>>,
846    dir: Dir,
847) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
848    let mut state = conn.state.lock("poll_open");
849    if let Some(ref e) = state.error {
850        return Poll::Ready(Err(e.clone()));
851    } else if let Some(id) = state.inner.streams().open(dir) {
852        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
853        drop(state); // Release the lock so clone can take it
854        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
855    }
856    loop {
857        match notify.as_mut().poll(ctx) {
858            // `state` lock ensures we didn't race with readiness
859            Poll::Pending => return Poll::Pending,
860            // Spurious wakeup, get a new future
861            Poll::Ready(()) => {
862                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
863            }
864        }
865    }
866}
867
868pin_project! {
869    /// Future produced by [`Connection::accept_uni`]
870    pub struct AcceptUni<'a> {
871        conn: &'a ConnectionRef,
872        #[pin]
873        notify: Notified<'a>,
874    }
875}
876
877impl Future for AcceptUni<'_> {
878    type Output = Result<RecvStream, ConnectionError>;
879
880    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
881        let this = self.project();
882        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
883        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
884    }
885}
886
887pin_project! {
888    /// Future produced by [`Connection::accept_bi`]
889    pub struct AcceptBi<'a> {
890        conn: &'a ConnectionRef,
891        #[pin]
892        notify: Notified<'a>,
893    }
894}
895
896impl Future for AcceptBi<'_> {
897    type Output = Result<(SendStream, RecvStream), ConnectionError>;
898
899    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
900        let this = self.project();
901        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
902        Poll::Ready(Ok((
903            SendStream::new(conn.clone(), id, is_0rtt),
904            RecvStream::new(conn, id, is_0rtt),
905        )))
906    }
907}
908
909fn poll_accept<'a>(
910    ctx: &mut Context<'_>,
911    conn: &'a ConnectionRef,
912    mut notify: Pin<&mut Notified<'a>>,
913    dir: Dir,
914) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
915    let mut state = conn.state.lock("poll_accept");
916    // Check for incoming streams before checking `state.error` so that already-received streams,
917    // which are necessarily finite, can be drained from a closed connection.
918    if let Some(id) = state.inner.streams().accept(dir) {
919        let is_0rtt = state.inner.is_handshaking();
920        state.wake(); // To send additional stream ID credit
921        drop(state); // Release the lock so clone can take it
922        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
923    } else if let Some(ref e) = state.error {
924        return Poll::Ready(Err(e.clone()));
925    }
926    loop {
927        match notify.as_mut().poll(ctx) {
928            // `state` lock ensures we didn't race with readiness
929            Poll::Pending => return Poll::Pending,
930            // Spurious wakeup, get a new future
931            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
932        }
933    }
934}
935
936pin_project! {
937    /// Future produced by [`Connection::read_datagram`]
938    pub struct ReadDatagram<'a> {
939        conn: &'a ConnectionRef,
940        #[pin]
941        notify: Notified<'a>,
942    }
943}
944
945impl Future for ReadDatagram<'_> {
946    type Output = Result<Bytes, ConnectionError>;
947    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
948        let mut this = self.project();
949        let mut state = this.conn.state.lock("ReadDatagram::poll");
950        // Check for buffered datagrams before checking `state.error` so that already-received
951        // datagrams, which are necessarily finite, can be drained from a closed connection.
952        match state.inner.datagrams().recv() {
953            Some(x) => {
954                return Poll::Ready(Ok(x));
955            }
956            _ => {
957                if let Some(ref e) = state.error {
958                    return Poll::Ready(Err(e.clone()));
959                }
960            }
961        }
962        loop {
963            match this.notify.as_mut().poll(ctx) {
964                // `state` lock ensures we didn't race with readiness
965                Poll::Pending => return Poll::Pending,
966                // Spurious wakeup, get a new future
967                Poll::Ready(()) => this
968                    .notify
969                    .set(this.conn.shared.datagram_received.notified()),
970            }
971        }
972    }
973}
974
975pin_project! {
976    /// Future produced by [`Connection::send_datagram_wait`]
977    pub struct SendDatagram<'a> {
978        conn: &'a ConnectionRef,
979        data: Option<Bytes>,
980        #[pin]
981        notify: Notified<'a>,
982    }
983}
984
985impl Future for SendDatagram<'_> {
986    type Output = Result<(), SendDatagramError>;
987    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
988        let mut this = self.project();
989        let mut state = this.conn.state.lock("SendDatagram::poll");
990        if let Some(ref e) = state.error {
991            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
992        }
993        use crate::SendDatagramError::*;
994        match state.inner.datagrams().send(
995            this.data.take().ok_or_else(|| {
996                error!("SendDatagram future polled without data");
997                SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
998            })?,
999            false,
1000        ) {
1001            Ok(()) => {
1002                state.wake();
1003                Poll::Ready(Ok(()))
1004            }
1005            Err(e) => Poll::Ready(Err(match e {
1006                Blocked(data) => {
1007                    this.data.replace(data);
1008                    loop {
1009                        match this.notify.as_mut().poll(ctx) {
1010                            Poll::Pending => return Poll::Pending,
1011                            // Spurious wakeup, get a new future
1012                            Poll::Ready(()) => this
1013                                .notify
1014                                .set(this.conn.shared.datagrams_unblocked.notified()),
1015                        }
1016                    }
1017                }
1018                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1019                Disabled => SendDatagramError::Disabled,
1020                TooLarge => SendDatagramError::TooLarge,
1021            })),
1022        }
1023    }
1024}
1025
1026#[derive(Debug)]
1027pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1028
1029impl ConnectionRef {
1030    #[allow(clippy::too_many_arguments)]
1031    fn new(
1032        handle: ConnectionHandle,
1033        conn: crate::Connection,
1034        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1035        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1036        on_handshake_data: oneshot::Sender<()>,
1037        on_connected: oneshot::Sender<bool>,
1038        socket: Arc<dyn AsyncUdpSocket>,
1039        runtime: Arc<dyn Runtime>,
1040    ) -> Self {
1041        Self(Arc::new(ConnectionInner {
1042            state: Mutex::new(State {
1043                inner: conn,
1044                driver: None,
1045                handle,
1046                on_handshake_data: Some(on_handshake_data),
1047                on_connected: Some(on_connected),
1048                connected: false,
1049                timer: None,
1050                timer_deadline: None,
1051                conn_events,
1052                endpoint_events,
1053                blocked_writers: FxHashMap::default(),
1054                blocked_readers: FxHashMap::default(),
1055                stopped: FxHashMap::default(),
1056                error: None,
1057                ref_count: 0,
1058                io_poller: socket.clone().create_io_poller(),
1059                socket,
1060                runtime,
1061                send_buffer: Vec::new(),
1062                buffered_transmit: None,
1063                binding_started: false,
1064            }),
1065            shared: Shared::default(),
1066        }))
1067    }
1068
1069    fn stable_id(&self) -> usize {
1070        &*self.0 as *const _ as usize
1071    }
1072}
1073
1074impl Clone for ConnectionRef {
1075    fn clone(&self) -> Self {
1076        self.state.lock("clone").ref_count += 1;
1077        Self(self.0.clone())
1078    }
1079}
1080
1081impl Drop for ConnectionRef {
1082    fn drop(&mut self) {
1083        let conn = &mut *self.state.lock("drop");
1084        if let Some(x) = conn.ref_count.checked_sub(1) {
1085            conn.ref_count = x;
1086            if x == 0 && !conn.inner.is_closed() {
1087                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1088                // not, we can't do any harm. If there were any streams being opened, then either
1089                // the connection will be closed for an unrelated reason or a fresh reference will
1090                // be constructed for the newly opened stream.
1091                conn.implicit_close(&self.shared);
1092            }
1093        }
1094    }
1095}
1096
1097impl std::ops::Deref for ConnectionRef {
1098    type Target = ConnectionInner;
1099    fn deref(&self) -> &Self::Target {
1100        &self.0
1101    }
1102}
1103
1104#[derive(Debug)]
1105pub(crate) struct ConnectionInner {
1106    pub(crate) state: Mutex<State>,
1107    pub(crate) shared: Shared,
1108}
1109
1110#[derive(Debug, Default)]
1111pub(crate) struct Shared {
1112    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1113    /// control budget
1114    stream_budget_available: [Notify; 2],
1115    /// Notified when the peer has initiated a new stream
1116    stream_incoming: [Notify; 2],
1117    datagram_received: Notify,
1118    datagrams_unblocked: Notify,
1119    closed: Notify,
1120}
1121
1122pub(crate) struct State {
1123    pub(crate) inner: crate::Connection,
1124    driver: Option<Waker>,
1125    handle: ConnectionHandle,
1126    on_handshake_data: Option<oneshot::Sender<()>>,
1127    on_connected: Option<oneshot::Sender<bool>>,
1128    connected: bool,
1129    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1130    timer_deadline: Option<Instant>,
1131    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1132    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1133    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1134    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1135    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1136    /// Always set to Some before the connection becomes drained
1137    pub(crate) error: Option<ConnectionError>,
1138    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1139    ref_count: usize,
1140    socket: Arc<dyn AsyncUdpSocket>,
1141    io_poller: Pin<Box<dyn UdpPoller>>,
1142    runtime: Arc<dyn Runtime>,
1143    send_buffer: Vec<u8>,
1144    /// We buffer a transmit when the underlying I/O would block
1145    buffered_transmit: Option<crate::Transmit>,
1146    /// True once we've initiated automatic channel binding (if enabled)
1147    binding_started: bool,
1148}
1149
1150impl State {
1151    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1152        let now = self.runtime.now();
1153        let mut transmits = 0;
1154
1155        let max_datagrams = self
1156            .socket
1157            .max_transmit_segments()
1158            .min(MAX_TRANSMIT_SEGMENTS);
1159
1160        loop {
1161            // Retry the last transmit, or get a new one.
1162            let t = match self.buffered_transmit.take() {
1163                Some(t) => t,
1164                None => {
1165                    self.send_buffer.clear();
1166                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1167                    match self
1168                        .inner
1169                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1170                    {
1171                        Some(t) => {
1172                            transmits += match t.segment_size {
1173                                None => 1,
1174                                Some(s) => t.size.div_ceil(s), // round up
1175                            };
1176                            t
1177                        }
1178                        None => break,
1179                    }
1180                }
1181            };
1182
1183            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1184                // Retry after a future wakeup
1185                self.buffered_transmit = Some(t);
1186                return Ok(false);
1187            }
1188
1189            let len = t.size;
1190            let retry = match self
1191                .socket
1192                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1193            {
1194                Ok(()) => false,
1195                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1196                Err(e) => return Err(e),
1197            };
1198            if retry {
1199                // We thought the socket was writable, but it wasn't. Retry so that either another
1200                // `poll_writable` call determines that the socket is indeed not writable and
1201                // registers us for a wakeup, or the send succeeds if this really was just a
1202                // transient failure.
1203                self.buffered_transmit = Some(t);
1204                continue;
1205            }
1206
1207            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1208                // TODO: What isn't ideal here yet is that if we don't poll all
1209                // datagrams that could be sent we don't go into the `app_limited`
1210                // state and CWND continues to grow until we get here the next time.
1211                // See https://github.com/quinn-rs/quinn/issues/1126
1212                return Ok(true);
1213            }
1214        }
1215
1216        Ok(false)
1217    }
1218
1219    fn forward_endpoint_events(&mut self) {
1220        while let Some(event) = self.inner.poll_endpoint_events() {
1221            // If the endpoint driver is gone, noop.
1222            let _ = self.endpoint_events.send((self.handle, event));
1223        }
1224    }
1225
1226    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1227    fn process_conn_events(
1228        &mut self,
1229        shared: &Shared,
1230        cx: &mut Context,
1231    ) -> Result<(), ConnectionError> {
1232        loop {
1233            match self.conn_events.poll_recv(cx) {
1234                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1235                    self.socket = socket;
1236                    self.io_poller = self.socket.clone().create_io_poller();
1237                    self.inner.local_address_changed();
1238                }
1239                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1240                    self.inner.handle_event(event);
1241                }
1242                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1243                    self.close(error_code, reason, shared);
1244                }
1245                Poll::Ready(None) => {
1246                    return Err(ConnectionError::TransportError(crate::TransportError {
1247                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1248                        frame: None,
1249                        reason: "endpoint driver future was dropped".to_string(),
1250                    }));
1251                }
1252                Poll::Pending => {
1253                    return Ok(());
1254                }
1255            }
1256        }
1257    }
1258
1259    fn forward_app_events(&mut self, shared: &Shared) {
1260        while let Some(event) = self.inner.poll() {
1261            use crate::Event::*;
1262            match event {
1263                HandshakeDataReady => {
1264                    if let Some(x) = self.on_handshake_data.take() {
1265                        let _ = x.send(());
1266                    }
1267                }
1268                Connected => {
1269                    self.connected = true;
1270                    if let Some(x) = self.on_connected.take() {
1271                        // We don't care if the on-connected future was dropped
1272                        let _ = x.send(self.inner.accepted_0rtt());
1273                    }
1274                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1275                        // Wake up rejected 0-RTT streams so they can fail immediately with
1276                        // `ZeroRttRejected` errors.
1277                        wake_all(&mut self.blocked_writers);
1278                        wake_all(&mut self.blocked_readers);
1279                        wake_all_notify(&mut self.stopped);
1280                    }
1281                }
1282                ConnectionLost { reason } => {
1283                    self.terminate(reason, shared);
1284                }
1285                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1286                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1287                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1288                }
1289                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1290                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1291                }
1292                DatagramReceived => {
1293                    shared.datagram_received.notify_waiters();
1294                }
1295                DatagramsUnblocked => {
1296                    shared.datagrams_unblocked.notify_waiters();
1297                }
1298                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1299                Stream(StreamEvent::Available { dir }) => {
1300                    // Might mean any number of streams are ready, so we wake up everyone
1301                    shared.stream_budget_available[dir as usize].notify_waiters();
1302                }
1303                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1304                Stream(StreamEvent::Stopped { id, .. }) => {
1305                    wake_stream_notify(id, &mut self.stopped);
1306                    wake_stream(id, &mut self.blocked_writers);
1307                }
1308            }
1309        }
1310    }
1311
1312    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1313        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1314        // timer is registered with the runtime (and check whether it's already
1315        // expired).
1316        match self.inner.poll_timeout() {
1317            Some(deadline) => {
1318                if let Some(delay) = &mut self.timer {
1319                    // There is no need to reset the tokio timer if the deadline
1320                    // did not change
1321                    if self
1322                        .timer_deadline
1323                        .map(|current_deadline| current_deadline != deadline)
1324                        .unwrap_or(true)
1325                    {
1326                        delay.as_mut().reset(deadline);
1327                    }
1328                } else {
1329                    self.timer = Some(self.runtime.new_timer(deadline));
1330                }
1331                // Store the actual expiration time of the timer
1332                self.timer_deadline = Some(deadline);
1333            }
1334            None => {
1335                self.timer_deadline = None;
1336                return false;
1337            }
1338        }
1339
1340        if self.timer_deadline.is_none() {
1341            return false;
1342        }
1343
1344        let delay = match self.timer.as_mut() {
1345            Some(timer) => timer.as_mut(),
1346            None => {
1347                error!("Timer missing in state where it should exist");
1348                return false;
1349            }
1350        };
1351        if delay.poll(cx).is_pending() {
1352            // Since there wasn't a timeout event, there is nothing new
1353            // for the connection to do
1354            return false;
1355        }
1356
1357        // A timer expired, so the caller needs to check for
1358        // new transmits, which might cause new timers to be set.
1359        self.inner.handle_timeout(self.runtime.now());
1360        self.timer_deadline = None;
1361        true
1362    }
1363
1364    /// Wake up a blocked `Driver` task to process I/O
1365    pub(crate) fn wake(&mut self) {
1366        if let Some(x) = self.driver.take() {
1367            x.wake();
1368        }
1369    }
1370
1371    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1372    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1373        self.error = Some(reason.clone());
1374        if let Some(x) = self.on_handshake_data.take() {
1375            let _ = x.send(());
1376        }
1377        wake_all(&mut self.blocked_writers);
1378        wake_all(&mut self.blocked_readers);
1379        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1380        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1381        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1382        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1383        shared.datagram_received.notify_waiters();
1384        shared.datagrams_unblocked.notify_waiters();
1385        if let Some(x) = self.on_connected.take() {
1386            let _ = x.send(false);
1387        }
1388        wake_all_notify(&mut self.stopped);
1389        shared.closed.notify_waiters();
1390    }
1391
1392    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1393        self.inner.close(self.runtime.now(), error_code, reason);
1394        self.terminate(ConnectionError::LocallyClosed, shared);
1395        self.wake();
1396    }
1397
1398    /// Close for a reason other than the application's explicit request
1399    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1400        self.close(0u32.into(), Bytes::new(), shared);
1401    }
1402
1403    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1404        if self.inner.is_handshaking()
1405            || self.inner.accepted_0rtt()
1406            || self.inner.side().is_server()
1407        {
1408            Ok(())
1409        } else {
1410            Err(())
1411        }
1412    }
1413}
1414
1415impl Drop for State {
1416    fn drop(&mut self) {
1417        if !self.inner.is_drained() {
1418            // Ensure the endpoint can tidy up
1419            let _ = self
1420                .endpoint_events
1421                .send((self.handle, crate::EndpointEvent::drained()));
1422        }
1423    }
1424}
1425
1426impl fmt::Debug for State {
1427    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1428        f.debug_struct("State").field("inner", &self.inner).finish()
1429    }
1430}
1431
1432fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1433    if let Some(waker) = wakers.remove(&stream_id) {
1434        waker.wake();
1435    }
1436}
1437
1438fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1439    wakers.drain().for_each(|(_, waker)| waker.wake())
1440}
1441
1442fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1443    if let Some(notify) = wakers.remove(&stream_id) {
1444        notify.notify_waiters()
1445    }
1446}
1447
1448fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1449    wakers
1450        .drain()
1451        .for_each(|(_, notify)| notify.notify_waiters())
1452}
1453
1454/// Errors that can arise when sending a datagram
1455#[derive(Debug, Error, Clone, Eq, PartialEq)]
1456pub enum SendDatagramError {
1457    /// The peer does not support receiving datagram frames
1458    #[error("datagrams not supported by peer")]
1459    UnsupportedByPeer,
1460    /// Datagram support is disabled locally
1461    #[error("datagram support disabled")]
1462    Disabled,
1463    /// The datagram is larger than the connection can currently accommodate
1464    ///
1465    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1466    /// exceeded.
1467    #[error("datagram too large")]
1468    TooLarge,
1469    /// The connection was lost
1470    #[error("connection lost")]
1471    ConnectionLost(#[from] ConnectionError),
1472}
1473
1474/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1475///
1476/// This limits the amount of CPU resources consumed by datagram generation,
1477/// and allows other tasks (like receiving ACKs) to run in between.
1478const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1479
1480/// The maximum amount of datagrams that are sent in a single transmit
1481///
1482/// This can be lower than the maximum platform capabilities, to avoid excessive
1483/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1484/// that numbers around 10 are a good compromise.
1485const MAX_TRANSMIT_SEGMENTS: usize = 10;