ant_quic/high_level/
connection.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    any::Any,
10    fmt,
11    future::Future,
12    io,
13    net::{IpAddr, SocketAddr},
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27    ConnectionEvent,
28    mutex::Mutex,
29    recv_stream::RecvStream,
30    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31    send_stream::SendStream,
32    udp_transmit,
33};
34use crate::{
35    ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36    Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42    conn: Option<ConnectionRef>,
43    connected: oneshot::Receiver<bool>,
44    handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48    pub(crate) fn new(
49        handle: ConnectionHandle,
50        conn: crate::Connection,
51        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53        socket: Arc<dyn AsyncUdpSocket>,
54        runtime: Arc<dyn Runtime>,
55    ) -> Self {
56        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57        let (on_connected_send, on_connected_recv) = oneshot::channel();
58        let conn = ConnectionRef::new(
59            handle,
60            conn,
61            endpoint_events,
62            conn_events,
63            on_handshake_data_send,
64            on_connected_send,
65            socket,
66            runtime.clone(),
67        );
68
69        let driver = ConnectionDriver(conn.clone());
70        runtime.spawn(Box::pin(
71            async {
72                if let Err(e) = driver.await {
73                    tracing::error!("I/O error: {e}");
74                }
75            }
76            .instrument(Span::current()),
77        ));
78
79        Self {
80            conn: Some(conn),
81            connected: on_connected_recv,
82            handshake_data_ready: Some(on_handshake_data_recv),
83        }
84    }
85
86    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87    ///
88    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89    /// If so, the returned [`Connection`] can be used to send application data without waiting for
90    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92    /// complete, at which point subsequently opened streams and written data will have full
93    /// cryptographic protection.
94    ///
95    /// ## Outgoing
96    ///
97    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103    /// If it was rejected, the existence of streams opened and other application data sent prior
104    /// to the handshake completing will not be conveyed to the remote application, and local
105    /// operations on them will return `ZeroRttRejected` errors.
106    ///
107    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108    /// relevant resumption state to be stored in the server, which servers may limit or lose for
109    /// various reasons including not persisting resumption state across server restarts.
110    ///
111    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112    /// implementation's docs for 0-RTT pitfalls.
113    ///
114    /// ## Incoming
115    ///
116    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118    ///
119    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120    /// implementation's docs for 0-RTT pitfalls.
121    ///
122    /// ## Security
123    ///
124    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125    /// replay attacks, and should therefore never invoke non-idempotent operations.
126    ///
127    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128    /// before TLS client authentication has occurred, and should therefore not be used to send
129    /// data for which client authentication is being used.
130    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132        // have to release it explicitly before returning `self` by value.
133        let conn = match self.conn.as_mut() {
134            Some(conn) => conn.state.lock("into_0rtt"),
135            None => {
136                return Err(self);
137            }
138        };
139
140        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141        drop(conn);
142
143        if is_ok {
144            match self.conn.take() {
145                Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146                None => {
147                    tracing::error!("Connection state missing during 0-RTT acceptance");
148                    Err(self)
149                }
150            }
151        } else {
152            Err(self)
153        }
154    }
155
156    /// Parameters negotiated during the handshake
157    ///
158    /// The dynamic type returned is determined by the configured
159    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160    /// be [`downcast`](Box::downcast) to a
161    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165        // simple.
166        if let Some(x) = self.handshake_data_ready.take() {
167            let _ = x.await;
168        }
169        let conn = self.conn.as_ref().ok_or_else(|| {
170            tracing::error!("Connection state missing while retrieving handshake data");
171            ConnectionError::LocallyClosed
172        })?;
173        let inner = conn.state.lock("handshake");
174        inner
175            .inner
176            .crypto_session()
177            .handshake_data()
178            .ok_or_else(|| {
179                inner.error.clone().unwrap_or_else(|| {
180                    error!("Spurious handshake data ready notification with no error");
181                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182                        "Spurious handshake notification".to_string(),
183                    ))
184                })
185            })
186    }
187
188    /// The local IP address which was used when the peer established
189    /// the connection
190    ///
191    /// This can be different from the address the endpoint is bound to, in case
192    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193    ///
194    /// This will return `None` for clients, or when the platform does not expose this
195    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196    /// supported platforms when using quinn_udp for I/O, which is the default.
197    ///
198    /// Will panic if called after `poll` has returned `Ready`.
199    pub fn local_ip(&self) -> Option<IpAddr> {
200        let conn = self.conn.as_ref()?;
201        let inner = conn.state.lock("local_ip");
202
203        inner.inner.local_ip()
204    }
205
206    /// The peer's UDP address
207    ///
208    /// Returns an error if called after `poll` has returned `Ready`.
209    pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210        let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211            error!("Connection used after yielding Ready");
212            ConnectionError::LocallyClosed
213        })?;
214        Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215    }
216}
217
218impl Future for Connecting {
219    type Output = Result<Connection, ConnectionError>;
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221        Pin::new(&mut self.connected).poll(cx).map(|_| {
222            let conn = self.conn.take().ok_or_else(|| {
223                error!("Connection not available when connecting future resolves");
224                ConnectionError::LocallyClosed
225            })?;
226            let inner = conn.state.lock("connecting");
227            if inner.connected {
228                drop(inner);
229                Ok(Connection(conn))
230            } else {
231                Err(inner.error.clone().unwrap_or_else(|| {
232                    ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233                        "connection failed without error".to_string(),
234                    ))
235                }))
236            }
237        })
238    }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248    type Output = bool;
249    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251    }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269    type Output = Result<(), io::Error>;
270
271    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272        let conn = &mut *self.0.state.lock("poll");
273
274        let span = debug_span!("drive", id = conn.handle.0);
275        let _guard = span.enter();
276
277        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278            conn.terminate(e, &self.0.shared);
279            return Poll::Ready(Ok(()));
280        }
281        let mut keep_going = conn.drive_transmit(cx)?;
282        // If a timer expires, there might be more to transmit. When we transmit something, we
283        // might need to reset a timer. Hence, we must loop until neither happens.
284        keep_going |= conn.drive_timer(cx);
285        conn.forward_endpoint_events();
286        conn.forward_app_events(&self.0.shared);
287
288        // Kick off automatic channel binding once connected, if configured
289        if conn.connected && !conn.binding_started {
290            if let Some(rt) = crate::trust::global_runtime() {
291                // Delay NEW_TOKEN until binding completes
292                conn.inner.set_delay_new_token_until_binding(true);
293
294                let hl_conn_server = Connection(self.0.clone());
295                let hl_conn_client = hl_conn_server.clone();
296                let store = rt.store.clone();
297                let policy = rt.policy.clone();
298                let signer = rt.local_signing_key.clone();
299                let spki = rt.local_spki.clone();
300                let runtime = conn.runtime.clone();
301
302                if conn.inner.side().is_server() {
303                    runtime.spawn(Box::pin(async move {
304                        match crate::trust::recv_verify_binding_ed25519(
305                            &hl_conn_server,
306                            &*store,
307                            &policy,
308                        )
309                        .await
310                        {
311                            Ok(peer) => {
312                                let mismatch = {
313                                    let mut state = hl_conn_server.0.state.lock("trust set peer");
314                                    let inner = &mut state.inner;
315                                    let expected = inner.expected_peer_id();
316                                    let mismatch = expected.is_some() && expected != Some(peer);
317                                    if !mismatch {
318                                        inner.set_token_binding_peer_id(peer);
319                                        inner.set_delay_new_token_until_binding(false);
320                                    }
321                                    mismatch
322                                };
323
324                                if mismatch {
325                                    hl_conn_server.close(0u32.into(), b"token peer mismatch");
326                                } else if let Err(_) =
327                                    crate::trust::persist_peer_id_global(Some(peer))
328                                {
329                                    hl_conn_server.close(0u32.into(), b"peer persist failed");
330                                }
331                            }
332                            Err(_e) => {
333                                hl_conn_server.close(0u32.into(), b"channel binding failed");
334                            }
335                        }
336                    }));
337                }
338
339                if conn.inner.side().is_client() {
340                    runtime.spawn(Box::pin(async move {
341                        if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
342                            let _ = crate::trust::send_binding_ed25519(
343                                &hl_conn_client,
344                                &exp,
345                                &signer,
346                                &spki,
347                            )
348                            .await;
349                        }
350                    }));
351                }
352
353                conn.binding_started = true;
354            }
355        }
356
357        if !conn.inner.is_drained() {
358            if keep_going {
359                // If the connection hasn't processed all tasks, schedule it again
360                cx.waker().wake_by_ref();
361            } else {
362                conn.driver = Some(cx.waker().clone());
363            }
364            return Poll::Pending;
365        }
366        if conn.error.is_none() {
367            unreachable!("drained connections always have an error");
368        }
369        Poll::Ready(Ok(()))
370    }
371}
372
373/// A QUIC connection.
374///
375/// If all references to a connection (including every clone of the `Connection` handle, streams of
376/// incoming streams, and the various stream types) have been dropped, then the connection will be
377/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
378/// connection explicitly by calling [`Connection::close()`].
379///
380/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
381/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
382/// application. [`Connection::close()`] describes in more detail how to gracefully close a
383/// connection without losing application data.
384///
385/// May be cloned to obtain another handle to the same connection.
386///
387/// [`Connection::close()`]: Connection::close
388#[derive(Debug, Clone)]
389pub struct Connection(ConnectionRef);
390
391impl Connection {
392    /// Initiate a new outgoing unidirectional stream.
393    ///
394    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
395    /// consequence, the peer won't be notified that a stream has been opened until the stream is
396    /// actually used.
397    pub fn open_uni(&self) -> OpenUni<'_> {
398        OpenUni {
399            conn: &self.0,
400            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
401        }
402    }
403
404    /// Initiate a new outgoing bidirectional stream.
405    ///
406    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
407    /// consequence, the peer won't be notified that a stream has been opened until the stream is
408    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
409    /// anything to [`SendStream`] will never succeed.
410    ///
411    /// [`open_bi()`]: Self::open_bi
412    /// [`SendStream`]: crate::SendStream
413    /// [`RecvStream`]: crate::RecvStream
414    pub fn open_bi(&self) -> OpenBi<'_> {
415        OpenBi {
416            conn: &self.0,
417            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
418        }
419    }
420
421    /// Accept the next incoming uni-directional stream
422    pub fn accept_uni(&self) -> AcceptUni<'_> {
423        AcceptUni {
424            conn: &self.0,
425            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
426        }
427    }
428
429    /// Accept the next incoming bidirectional stream
430    ///
431    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
432    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
433    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
434    ///
435    /// [`accept_bi()`]: Self::accept_bi
436    /// [`open_bi()`]: Self::open_bi
437    /// [`SendStream`]: crate::SendStream
438    /// [`RecvStream`]: crate::RecvStream
439    pub fn accept_bi(&self) -> AcceptBi<'_> {
440        AcceptBi {
441            conn: &self.0,
442            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
443        }
444    }
445
446    /// Receive an application datagram
447    pub fn read_datagram(&self) -> ReadDatagram<'_> {
448        ReadDatagram {
449            conn: &self.0,
450            notify: self.0.shared.datagram_received.notified(),
451        }
452    }
453
454    /// Wait for the connection to be closed for any reason
455    ///
456    /// Despite the return type's name, closed connections are often not an error condition at the
457    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
458    /// and [`ConnectionError::ApplicationClosed`].
459    pub async fn closed(&self) -> ConnectionError {
460        {
461            let conn = self.0.state.lock("closed");
462            if let Some(error) = conn.error.as_ref() {
463                return error.clone();
464            }
465            // Construct the future while the lock is held to ensure we can't miss a wakeup if
466            // the `Notify` is signaled immediately after we release the lock. `await` it after
467            // the lock guard is out of scope.
468            self.0.shared.closed.notified()
469        }
470        .await;
471        self.0
472            .state
473            .lock("closed")
474            .error
475            .as_ref()
476            .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
477            .clone()
478    }
479
480    /// If the connection is closed, the reason why.
481    ///
482    /// Returns `None` if the connection is still open.
483    pub fn close_reason(&self) -> Option<ConnectionError> {
484        self.0.state.lock("close_reason").error.clone()
485    }
486
487    /// Close the connection immediately.
488    ///
489    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
490    /// more data is sent to the peer and the peer may drop buffered data upon receiving
491    /// the CONNECTION_CLOSE frame.
492    ///
493    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
494    ///
495    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
496    /// is preserved in full, it should be kept under 1KiB.
497    ///
498    /// # Gracefully closing a connection
499    ///
500    /// Only the peer last receiving application data can be certain that all data is
501    /// delivered. The only reliable action it can then take is to close the connection,
502    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
503    /// frame is very likely if both endpoints stay online long enough, and
504    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
505    /// remote peer will time out the connection, provided that the idle timeout is not
506    /// disabled.
507    ///
508    /// The sending side can not guarantee all stream data is delivered to the remote
509    /// application. It only knows the data is delivered to the QUIC stack of the remote
510    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
511    /// [`close()`] the remote endpoint may drop any data it received but is as yet
512    /// undelivered to the application, including data that was acknowledged as received to
513    /// the local endpoint.
514    ///
515    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
516    /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
517    /// [`close()`]: Connection::close
518    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
519        let conn = &mut *self.0.state.lock("close");
520        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
521    }
522
523    /// Transmit `data` as an unreliable, unordered application datagram
524    ///
525    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
526    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
527    /// dictated by the peer.
528    ///
529    /// Previously queued datagrams which are still unsent may be discarded to make space for this
530    /// datagram, in order of oldest to newest.
531    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
532        let conn = &mut *self.0.state.lock("send_datagram");
533        if let Some(ref x) = conn.error {
534            return Err(SendDatagramError::ConnectionLost(x.clone()));
535        }
536        use crate::SendDatagramError::*;
537        match conn.inner.datagrams().send(data, true) {
538            Ok(()) => {
539                conn.wake();
540                Ok(())
541            }
542            Err(e) => Err(match e {
543                Blocked(..) => unreachable!(),
544                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
545                Disabled => SendDatagramError::Disabled,
546                TooLarge => SendDatagramError::TooLarge,
547            }),
548        }
549    }
550
551    /// Transmit `data` as an unreliable, unordered application datagram
552    ///
553    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
554    /// conditions, which effectively prioritizes old datagrams over new datagrams.
555    ///
556    /// See [`send_datagram()`] for details.
557    ///
558    /// [`send_datagram()`]: Connection::send_datagram
559    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
560        SendDatagram {
561            conn: &self.0,
562            data: Some(data),
563            notify: self.0.shared.datagrams_unblocked.notified(),
564        }
565    }
566
567    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
568    ///
569    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
570    ///
571    /// This may change over the lifetime of a connection according to variation in the path MTU
572    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
573    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
574    ///
575    /// Not necessarily the maximum size of received datagrams.
576    ///
577    /// [`send_datagram()`]: Connection::send_datagram
578    pub fn max_datagram_size(&self) -> Option<usize> {
579        self.0
580            .state
581            .lock("max_datagram_size")
582            .inner
583            .datagrams()
584            .max_size()
585    }
586
587    /// Bytes available in the outgoing datagram buffer
588    ///
589    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
590    /// at most this size is guaranteed not to cause older datagrams to be dropped.
591    pub fn datagram_send_buffer_space(&self) -> usize {
592        self.0
593            .state
594            .lock("datagram_send_buffer_space")
595            .inner
596            .datagrams()
597            .send_buffer_space()
598    }
599
600    /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
601    pub fn send_nat_address_advertisement(
602        &self,
603        address: SocketAddr,
604        priority: u32,
605    ) -> Result<u64, crate::ConnectionError> {
606        let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
607        conn.inner.send_nat_address_advertisement(address, priority)
608    }
609
610    /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
611    pub fn send_nat_punch_coordination(
612        &self,
613        paired_with_sequence_number: u64,
614        address: SocketAddr,
615        round: u32,
616    ) -> Result<(), crate::ConnectionError> {
617        let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
618        conn.inner
619            .send_nat_punch_coordination(paired_with_sequence_number, address, round)
620    }
621
622    /// The side of the connection (client or server)
623    pub fn side(&self) -> Side {
624        self.0.state.lock("side").inner.side()
625    }
626
627    /// The peer's UDP address
628    ///
629    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
630    /// switching to a cellular internet connection.
631    pub fn remote_address(&self) -> SocketAddr {
632        self.0.state.lock("remote_address").inner.remote_address()
633    }
634
635    /// The local IP address which was used when the peer established
636    /// the connection
637    ///
638    /// This can be different from the address the endpoint is bound to, in case
639    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
640    ///
641    /// This will return `None` for clients, or when the platform does not expose this
642    /// information. See quinn_udp's RecvMeta::dst_ip for a list of
643    /// supported platforms when using quinn_udp for I/O, which is the default.
644    pub fn local_ip(&self) -> Option<IpAddr> {
645        self.0.state.lock("local_ip").inner.local_ip()
646    }
647
648    /// Current best estimate of this connection's latency (round-trip-time)
649    pub fn rtt(&self) -> Duration {
650        self.0.state.lock("rtt").inner.rtt()
651    }
652
653    /// Returns connection statistics
654    pub fn stats(&self) -> ConnectionStats {
655        self.0.state.lock("stats").inner.stats()
656    }
657
658    /// Current state of the congestion control algorithm, for debugging purposes
659    pub fn congestion_state(&self) -> Box<dyn Controller> {
660        self.0
661            .state
662            .lock("congestion_state")
663            .inner
664            .congestion_state()
665            .clone_box()
666    }
667
668    /// Parameters negotiated during the handshake
669    ///
670    /// Guaranteed to return `Some` on fully established connections or after
671    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
672    /// the returned value.
673    ///
674    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
675    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
676        self.0
677            .state
678            .lock("handshake_data")
679            .inner
680            .crypto_session()
681            .handshake_data()
682    }
683
684    /// Cryptographic identity of the peer
685    ///
686    /// The dynamic type returned is determined by the configured
687    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
688    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
689    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
690        self.0
691            .state
692            .lock("peer_identity")
693            .inner
694            .crypto_session()
695            .peer_identity()
696    }
697
698    /// A stable identifier for this connection
699    ///
700    /// Peer addresses and connection IDs can change, but this value will remain
701    /// fixed for the lifetime of the connection.
702    pub fn stable_id(&self) -> usize {
703        self.0.stable_id()
704    }
705
706    /// Returns true if this connection negotiated post-quantum settings.
707    ///
708    /// This reflects either explicit PQC algorithms advertised via transport
709    /// parameters or in-band detection from handshake CRYPTO frames.
710    pub fn is_pqc(&self) -> bool {
711        let state = self.0.state.lock("is_pqc");
712        state.inner.is_pqc()
713    }
714
715    /// Debug-only hint: returns true when the underlying TLS provider was
716    /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
717    /// for tests and does not itself guarantee group enforcement.
718    pub fn debug_kem_only(&self) -> bool {
719        #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
720        {
721            crate::crypto::rustls::debug_kem_only_enabled()
722        }
723        #[cfg(not(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring")))]
724        {
725            false
726        }
727    }
728
729    /// Update traffic keys spontaneously
730    ///
731    /// This primarily exists for testing purposes.
732    pub fn force_key_update(&self) {
733        self.0
734            .state
735            .lock("force_key_update")
736            .inner
737            .force_key_update()
738    }
739
740    /// Derive keying material from this connection's TLS session secrets.
741    ///
742    /// When both peers call this method with the same `label` and `context`
743    /// arguments and `output` buffers of equal length, they will get the
744    /// same sequence of bytes in `output`. These bytes are cryptographically
745    /// strong and pseudorandom, and are suitable for use as keying material.
746    ///
747    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
748    pub fn export_keying_material(
749        &self,
750        output: &mut [u8],
751        label: &[u8],
752        context: &[u8],
753    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
754        self.0
755            .state
756            .lock("export_keying_material")
757            .inner
758            .crypto_session()
759            .export_keying_material(output, label, context)
760    }
761
762    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
763    ///
764    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
765    /// `count`s increase both minimum and worst-case memory consumption.
766    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
767        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
768        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
769        // May need to send MAX_STREAMS to make progress
770        conn.wake();
771    }
772
773    /// See [`crate::TransportConfig::receive_window()`]
774    pub fn set_receive_window(&self, receive_window: VarInt) {
775        let mut conn = self.0.state.lock("set_receive_window");
776        conn.inner.set_receive_window(receive_window);
777        conn.wake();
778    }
779
780    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
781    ///
782    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
783    /// `count`s increase both minimum and worst-case memory consumption.
784    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
785        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
786        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
787        // May need to send MAX_STREAMS to make progress
788        conn.wake();
789    }
790
791    /// Set up qlog for this connection.
792    #[cfg(feature = "__qlog")]
793    pub fn set_qlog(
794        &mut self,
795        writer: Box<dyn std::io::Write + Send + Sync>,
796        title: Option<String>,
797        description: Option<String>,
798    ) {
799        let mut state = self.0.state.lock("__qlog");
800        state
801            .inner
802            .set_qlog(writer, title, description, Instant::now());
803    }
804}
805
806pin_project! {
807    /// Future produced by [`Connection::open_uni`]
808    pub struct OpenUni<'a> {
809        conn: &'a ConnectionRef,
810        #[pin]
811        notify: Notified<'a>,
812    }
813}
814
815impl Future for OpenUni<'_> {
816    type Output = Result<SendStream, ConnectionError>;
817    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
818        let this = self.project();
819        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
820        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
821    }
822}
823
824pin_project! {
825    /// Future produced by [`Connection::open_bi`]
826    pub struct OpenBi<'a> {
827        conn: &'a ConnectionRef,
828        #[pin]
829        notify: Notified<'a>,
830    }
831}
832
833impl Future for OpenBi<'_> {
834    type Output = Result<(SendStream, RecvStream), ConnectionError>;
835    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
836        let this = self.project();
837        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
838
839        Poll::Ready(Ok((
840            SendStream::new(conn.clone(), id, is_0rtt),
841            RecvStream::new(conn, id, is_0rtt),
842        )))
843    }
844}
845
846fn poll_open<'a>(
847    ctx: &mut Context<'_>,
848    conn: &'a ConnectionRef,
849    mut notify: Pin<&mut Notified<'a>>,
850    dir: Dir,
851) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
852    let mut state = conn.state.lock("poll_open");
853    if let Some(ref e) = state.error {
854        return Poll::Ready(Err(e.clone()));
855    } else if let Some(id) = state.inner.streams().open(dir) {
856        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
857        drop(state); // Release the lock so clone can take it
858        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
859    }
860    loop {
861        match notify.as_mut().poll(ctx) {
862            // `state` lock ensures we didn't race with readiness
863            Poll::Pending => return Poll::Pending,
864            // Spurious wakeup, get a new future
865            Poll::Ready(()) => {
866                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
867            }
868        }
869    }
870}
871
872pin_project! {
873    /// Future produced by [`Connection::accept_uni`]
874    pub struct AcceptUni<'a> {
875        conn: &'a ConnectionRef,
876        #[pin]
877        notify: Notified<'a>,
878    }
879}
880
881impl Future for AcceptUni<'_> {
882    type Output = Result<RecvStream, ConnectionError>;
883
884    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
885        let this = self.project();
886        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
887        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
888    }
889}
890
891pin_project! {
892    /// Future produced by [`Connection::accept_bi`]
893    pub struct AcceptBi<'a> {
894        conn: &'a ConnectionRef,
895        #[pin]
896        notify: Notified<'a>,
897    }
898}
899
900impl Future for AcceptBi<'_> {
901    type Output = Result<(SendStream, RecvStream), ConnectionError>;
902
903    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
904        let this = self.project();
905        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
906        Poll::Ready(Ok((
907            SendStream::new(conn.clone(), id, is_0rtt),
908            RecvStream::new(conn, id, is_0rtt),
909        )))
910    }
911}
912
913fn poll_accept<'a>(
914    ctx: &mut Context<'_>,
915    conn: &'a ConnectionRef,
916    mut notify: Pin<&mut Notified<'a>>,
917    dir: Dir,
918) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
919    let mut state = conn.state.lock("poll_accept");
920    // Check for incoming streams before checking `state.error` so that already-received streams,
921    // which are necessarily finite, can be drained from a closed connection.
922    if let Some(id) = state.inner.streams().accept(dir) {
923        let is_0rtt = state.inner.is_handshaking();
924        state.wake(); // To send additional stream ID credit
925        drop(state); // Release the lock so clone can take it
926        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
927    } else if let Some(ref e) = state.error {
928        return Poll::Ready(Err(e.clone()));
929    }
930    loop {
931        match notify.as_mut().poll(ctx) {
932            // `state` lock ensures we didn't race with readiness
933            Poll::Pending => return Poll::Pending,
934            // Spurious wakeup, get a new future
935            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
936        }
937    }
938}
939
940pin_project! {
941    /// Future produced by [`Connection::read_datagram`]
942    pub struct ReadDatagram<'a> {
943        conn: &'a ConnectionRef,
944        #[pin]
945        notify: Notified<'a>,
946    }
947}
948
949impl Future for ReadDatagram<'_> {
950    type Output = Result<Bytes, ConnectionError>;
951    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
952        let mut this = self.project();
953        let mut state = this.conn.state.lock("ReadDatagram::poll");
954        // Check for buffered datagrams before checking `state.error` so that already-received
955        // datagrams, which are necessarily finite, can be drained from a closed connection.
956        match state.inner.datagrams().recv() {
957            Some(x) => {
958                return Poll::Ready(Ok(x));
959            }
960            _ => {
961                if let Some(ref e) = state.error {
962                    return Poll::Ready(Err(e.clone()));
963                }
964            }
965        }
966        loop {
967            match this.notify.as_mut().poll(ctx) {
968                // `state` lock ensures we didn't race with readiness
969                Poll::Pending => return Poll::Pending,
970                // Spurious wakeup, get a new future
971                Poll::Ready(()) => this
972                    .notify
973                    .set(this.conn.shared.datagram_received.notified()),
974            }
975        }
976    }
977}
978
979pin_project! {
980    /// Future produced by [`Connection::send_datagram_wait`]
981    pub struct SendDatagram<'a> {
982        conn: &'a ConnectionRef,
983        data: Option<Bytes>,
984        #[pin]
985        notify: Notified<'a>,
986    }
987}
988
989impl Future for SendDatagram<'_> {
990    type Output = Result<(), SendDatagramError>;
991    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
992        let mut this = self.project();
993        let mut state = this.conn.state.lock("SendDatagram::poll");
994        if let Some(ref e) = state.error {
995            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
996        }
997        use crate::SendDatagramError::*;
998        match state.inner.datagrams().send(
999            this.data.take().ok_or_else(|| {
1000                error!("SendDatagram future polled without data");
1001                SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1002            })?,
1003            false,
1004        ) {
1005            Ok(()) => {
1006                state.wake();
1007                Poll::Ready(Ok(()))
1008            }
1009            Err(e) => Poll::Ready(Err(match e {
1010                Blocked(data) => {
1011                    this.data.replace(data);
1012                    loop {
1013                        match this.notify.as_mut().poll(ctx) {
1014                            Poll::Pending => return Poll::Pending,
1015                            // Spurious wakeup, get a new future
1016                            Poll::Ready(()) => this
1017                                .notify
1018                                .set(this.conn.shared.datagrams_unblocked.notified()),
1019                        }
1020                    }
1021                }
1022                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1023                Disabled => SendDatagramError::Disabled,
1024                TooLarge => SendDatagramError::TooLarge,
1025            })),
1026        }
1027    }
1028}
1029
1030#[derive(Debug)]
1031pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1032
1033impl ConnectionRef {
1034    #[allow(clippy::too_many_arguments)]
1035    fn new(
1036        handle: ConnectionHandle,
1037        conn: crate::Connection,
1038        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1039        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1040        on_handshake_data: oneshot::Sender<()>,
1041        on_connected: oneshot::Sender<bool>,
1042        socket: Arc<dyn AsyncUdpSocket>,
1043        runtime: Arc<dyn Runtime>,
1044    ) -> Self {
1045        Self(Arc::new(ConnectionInner {
1046            state: Mutex::new(State {
1047                inner: conn,
1048                driver: None,
1049                handle,
1050                on_handshake_data: Some(on_handshake_data),
1051                on_connected: Some(on_connected),
1052                connected: false,
1053                timer: None,
1054                timer_deadline: None,
1055                conn_events,
1056                endpoint_events,
1057                blocked_writers: FxHashMap::default(),
1058                blocked_readers: FxHashMap::default(),
1059                stopped: FxHashMap::default(),
1060                error: None,
1061                ref_count: 0,
1062                io_poller: socket.clone().create_io_poller(),
1063                socket,
1064                runtime,
1065                send_buffer: Vec::new(),
1066                buffered_transmit: None,
1067                binding_started: false,
1068            }),
1069            shared: Shared::default(),
1070        }))
1071    }
1072
1073    fn stable_id(&self) -> usize {
1074        &*self.0 as *const _ as usize
1075    }
1076}
1077
1078impl Clone for ConnectionRef {
1079    fn clone(&self) -> Self {
1080        self.state.lock("clone").ref_count += 1;
1081        Self(self.0.clone())
1082    }
1083}
1084
1085impl Drop for ConnectionRef {
1086    fn drop(&mut self) {
1087        let conn = &mut *self.state.lock("drop");
1088        if let Some(x) = conn.ref_count.checked_sub(1) {
1089            conn.ref_count = x;
1090            if x == 0 && !conn.inner.is_closed() {
1091                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1092                // not, we can't do any harm. If there were any streams being opened, then either
1093                // the connection will be closed for an unrelated reason or a fresh reference will
1094                // be constructed for the newly opened stream.
1095                conn.implicit_close(&self.shared);
1096            }
1097        }
1098    }
1099}
1100
1101impl std::ops::Deref for ConnectionRef {
1102    type Target = ConnectionInner;
1103    fn deref(&self) -> &Self::Target {
1104        &self.0
1105    }
1106}
1107
1108#[derive(Debug)]
1109pub(crate) struct ConnectionInner {
1110    pub(crate) state: Mutex<State>,
1111    pub(crate) shared: Shared,
1112}
1113
1114#[derive(Debug, Default)]
1115pub(crate) struct Shared {
1116    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1117    /// control budget
1118    stream_budget_available: [Notify; 2],
1119    /// Notified when the peer has initiated a new stream
1120    stream_incoming: [Notify; 2],
1121    datagram_received: Notify,
1122    datagrams_unblocked: Notify,
1123    closed: Notify,
1124}
1125
1126pub(crate) struct State {
1127    pub(crate) inner: crate::Connection,
1128    driver: Option<Waker>,
1129    handle: ConnectionHandle,
1130    on_handshake_data: Option<oneshot::Sender<()>>,
1131    on_connected: Option<oneshot::Sender<bool>>,
1132    connected: bool,
1133    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1134    timer_deadline: Option<Instant>,
1135    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1136    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1137    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1138    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1139    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1140    /// Always set to Some before the connection becomes drained
1141    pub(crate) error: Option<ConnectionError>,
1142    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1143    ref_count: usize,
1144    socket: Arc<dyn AsyncUdpSocket>,
1145    io_poller: Pin<Box<dyn UdpPoller>>,
1146    runtime: Arc<dyn Runtime>,
1147    send_buffer: Vec<u8>,
1148    /// We buffer a transmit when the underlying I/O would block
1149    buffered_transmit: Option<crate::Transmit>,
1150    /// True once we've initiated automatic channel binding (if enabled)
1151    binding_started: bool,
1152}
1153
1154impl State {
1155    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1156        let now = self.runtime.now();
1157        let mut transmits = 0;
1158
1159        let max_datagrams = self
1160            .socket
1161            .max_transmit_segments()
1162            .min(MAX_TRANSMIT_SEGMENTS);
1163
1164        loop {
1165            // Retry the last transmit, or get a new one.
1166            let t = match self.buffered_transmit.take() {
1167                Some(t) => t,
1168                None => {
1169                    self.send_buffer.clear();
1170                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1171                    match self
1172                        .inner
1173                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1174                    {
1175                        Some(t) => {
1176                            transmits += match t.segment_size {
1177                                None => 1,
1178                                Some(s) => t.size.div_ceil(s), // round up
1179                            };
1180                            t
1181                        }
1182                        None => break,
1183                    }
1184                }
1185            };
1186
1187            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1188                // Retry after a future wakeup
1189                self.buffered_transmit = Some(t);
1190                return Ok(false);
1191            }
1192
1193            let len = t.size;
1194            let retry = match self
1195                .socket
1196                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1197            {
1198                Ok(()) => false,
1199                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1200                Err(e) => return Err(e),
1201            };
1202            if retry {
1203                // We thought the socket was writable, but it wasn't. Retry so that either another
1204                // `poll_writable` call determines that the socket is indeed not writable and
1205                // registers us for a wakeup, or the send succeeds if this really was just a
1206                // transient failure.
1207                self.buffered_transmit = Some(t);
1208                continue;
1209            }
1210
1211            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1212                // TODO: What isn't ideal here yet is that if we don't poll all
1213                // datagrams that could be sent we don't go into the `app_limited`
1214                // state and CWND continues to grow until we get here the next time.
1215                // See https://github.com/quinn-rs/quinn/issues/1126
1216                return Ok(true);
1217            }
1218        }
1219
1220        Ok(false)
1221    }
1222
1223    fn forward_endpoint_events(&mut self) {
1224        while let Some(event) = self.inner.poll_endpoint_events() {
1225            // If the endpoint driver is gone, noop.
1226            let _ = self.endpoint_events.send((self.handle, event));
1227        }
1228    }
1229
1230    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1231    fn process_conn_events(
1232        &mut self,
1233        shared: &Shared,
1234        cx: &mut Context,
1235    ) -> Result<(), ConnectionError> {
1236        loop {
1237            match self.conn_events.poll_recv(cx) {
1238                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1239                    self.socket = socket;
1240                    self.io_poller = self.socket.clone().create_io_poller();
1241                    self.inner.local_address_changed();
1242                }
1243                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1244                    self.inner.handle_event(event);
1245                }
1246                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1247                    self.close(error_code, reason, shared);
1248                }
1249                Poll::Ready(None) => {
1250                    return Err(ConnectionError::TransportError(crate::TransportError {
1251                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1252                        frame: None,
1253                        reason: "endpoint driver future was dropped".to_string(),
1254                    }));
1255                }
1256                Poll::Pending => {
1257                    return Ok(());
1258                }
1259            }
1260        }
1261    }
1262
1263    fn forward_app_events(&mut self, shared: &Shared) {
1264        while let Some(event) = self.inner.poll() {
1265            use crate::Event::*;
1266            match event {
1267                HandshakeDataReady => {
1268                    if let Some(x) = self.on_handshake_data.take() {
1269                        let _ = x.send(());
1270                    }
1271                }
1272                Connected => {
1273                    self.connected = true;
1274                    if let Some(x) = self.on_connected.take() {
1275                        // We don't care if the on-connected future was dropped
1276                        let _ = x.send(self.inner.accepted_0rtt());
1277                    }
1278                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1279                        // Wake up rejected 0-RTT streams so they can fail immediately with
1280                        // `ZeroRttRejected` errors.
1281                        wake_all(&mut self.blocked_writers);
1282                        wake_all(&mut self.blocked_readers);
1283                        wake_all_notify(&mut self.stopped);
1284                    }
1285                }
1286                ConnectionLost { reason } => {
1287                    self.terminate(reason, shared);
1288                }
1289                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1290                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1291                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1292                }
1293                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1294                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1295                }
1296                DatagramReceived => {
1297                    shared.datagram_received.notify_waiters();
1298                }
1299                DatagramsUnblocked => {
1300                    shared.datagrams_unblocked.notify_waiters();
1301                }
1302                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1303                Stream(StreamEvent::Available { dir }) => {
1304                    // Might mean any number of streams are ready, so we wake up everyone
1305                    shared.stream_budget_available[dir as usize].notify_waiters();
1306                }
1307                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1308                Stream(StreamEvent::Stopped { id, .. }) => {
1309                    wake_stream_notify(id, &mut self.stopped);
1310                    wake_stream(id, &mut self.blocked_writers);
1311                }
1312            }
1313        }
1314    }
1315
1316    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1317        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1318        // timer is registered with the runtime (and check whether it's already
1319        // expired).
1320        match self.inner.poll_timeout() {
1321            Some(deadline) => {
1322                if let Some(delay) = &mut self.timer {
1323                    // There is no need to reset the tokio timer if the deadline
1324                    // did not change
1325                    if self
1326                        .timer_deadline
1327                        .map(|current_deadline| current_deadline != deadline)
1328                        .unwrap_or(true)
1329                    {
1330                        delay.as_mut().reset(deadline);
1331                    }
1332                } else {
1333                    self.timer = Some(self.runtime.new_timer(deadline));
1334                }
1335                // Store the actual expiration time of the timer
1336                self.timer_deadline = Some(deadline);
1337            }
1338            None => {
1339                self.timer_deadline = None;
1340                return false;
1341            }
1342        }
1343
1344        if self.timer_deadline.is_none() {
1345            return false;
1346        }
1347
1348        let delay = match self.timer.as_mut() {
1349            Some(timer) => timer.as_mut(),
1350            None => {
1351                error!("Timer missing in state where it should exist");
1352                return false;
1353            }
1354        };
1355        if delay.poll(cx).is_pending() {
1356            // Since there wasn't a timeout event, there is nothing new
1357            // for the connection to do
1358            return false;
1359        }
1360
1361        // A timer expired, so the caller needs to check for
1362        // new transmits, which might cause new timers to be set.
1363        self.inner.handle_timeout(self.runtime.now());
1364        self.timer_deadline = None;
1365        true
1366    }
1367
1368    /// Wake up a blocked `Driver` task to process I/O
1369    pub(crate) fn wake(&mut self) {
1370        if let Some(x) = self.driver.take() {
1371            x.wake();
1372        }
1373    }
1374
1375    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1376    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1377        self.error = Some(reason.clone());
1378        if let Some(x) = self.on_handshake_data.take() {
1379            let _ = x.send(());
1380        }
1381        wake_all(&mut self.blocked_writers);
1382        wake_all(&mut self.blocked_readers);
1383        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1384        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1385        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1386        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1387        shared.datagram_received.notify_waiters();
1388        shared.datagrams_unblocked.notify_waiters();
1389        if let Some(x) = self.on_connected.take() {
1390            let _ = x.send(false);
1391        }
1392        wake_all_notify(&mut self.stopped);
1393        shared.closed.notify_waiters();
1394    }
1395
1396    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1397        self.inner.close(self.runtime.now(), error_code, reason);
1398        self.terminate(ConnectionError::LocallyClosed, shared);
1399        self.wake();
1400    }
1401
1402    /// Close for a reason other than the application's explicit request
1403    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1404        self.close(0u32.into(), Bytes::new(), shared);
1405    }
1406
1407    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1408        if self.inner.is_handshaking()
1409            || self.inner.accepted_0rtt()
1410            || self.inner.side().is_server()
1411        {
1412            Ok(())
1413        } else {
1414            Err(())
1415        }
1416    }
1417}
1418
1419impl Drop for State {
1420    fn drop(&mut self) {
1421        if !self.inner.is_drained() {
1422            // Ensure the endpoint can tidy up
1423            let _ = self
1424                .endpoint_events
1425                .send((self.handle, crate::EndpointEvent::drained()));
1426        }
1427    }
1428}
1429
1430impl fmt::Debug for State {
1431    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1432        f.debug_struct("State").field("inner", &self.inner).finish()
1433    }
1434}
1435
1436fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1437    if let Some(waker) = wakers.remove(&stream_id) {
1438        waker.wake();
1439    }
1440}
1441
1442fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1443    wakers.drain().for_each(|(_, waker)| waker.wake())
1444}
1445
1446fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1447    if let Some(notify) = wakers.remove(&stream_id) {
1448        notify.notify_waiters()
1449    }
1450}
1451
1452fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1453    wakers
1454        .drain()
1455        .for_each(|(_, notify)| notify.notify_waiters())
1456}
1457
1458/// Errors that can arise when sending a datagram
1459#[derive(Debug, Error, Clone, Eq, PartialEq)]
1460pub enum SendDatagramError {
1461    /// The peer does not support receiving datagram frames
1462    #[error("datagrams not supported by peer")]
1463    UnsupportedByPeer,
1464    /// Datagram support is disabled locally
1465    #[error("datagram support disabled")]
1466    Disabled,
1467    /// The datagram is larger than the connection can currently accommodate
1468    ///
1469    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1470    /// exceeded.
1471    #[error("datagram too large")]
1472    TooLarge,
1473    /// The connection was lost
1474    #[error("connection lost")]
1475    ConnectionLost(#[from] ConnectionError),
1476}
1477
1478/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1479///
1480/// This limits the amount of CPU resources consumed by datagram generation,
1481/// and allows other tasks (like receiving ACKs) to run in between.
1482const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1483
1484/// The maximum amount of datagrams that are sent in a single transmit
1485///
1486/// This can be lower than the maximum platform capabilities, to avoid excessive
1487/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1488/// that numbers around 10 are a good compromise.
1489const MAX_TRANSMIT_SEGMENTS: usize = 10;