ant_quic/high_level/connection.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9 any::Any,
10 fmt,
11 future::Future,
12 io,
13 net::{IpAddr, SocketAddr},
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27 ConnectionEvent,
28 mutex::Mutex,
29 recv_stream::RecvStream,
30 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31 send_stream::SendStream,
32 udp_transmit,
33};
34use crate::{
35 ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36 Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42 conn: Option<ConnectionRef>,
43 connected: oneshot::Receiver<bool>,
44 handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48 pub(crate) fn new(
49 handle: ConnectionHandle,
50 conn: crate::Connection,
51 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53 socket: Arc<dyn AsyncUdpSocket>,
54 runtime: Arc<dyn Runtime>,
55 ) -> Self {
56 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57 let (on_connected_send, on_connected_recv) = oneshot::channel();
58 let conn = ConnectionRef::new(
59 handle,
60 conn,
61 endpoint_events,
62 conn_events,
63 on_handshake_data_send,
64 on_connected_send,
65 socket,
66 runtime.clone(),
67 );
68
69 let driver = ConnectionDriver(conn.clone());
70 runtime.spawn(Box::pin(
71 async {
72 if let Err(e) = driver.await {
73 tracing::error!("I/O error: {e}");
74 }
75 }
76 .instrument(Span::current()),
77 ));
78
79 Self {
80 conn: Some(conn),
81 connected: on_connected_recv,
82 handshake_data_ready: Some(on_handshake_data_recv),
83 }
84 }
85
86 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87 ///
88 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89 /// If so, the returned [`Connection`] can be used to send application data without waiting for
90 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92 /// complete, at which point subsequently opened streams and written data will have full
93 /// cryptographic protection.
94 ///
95 /// ## Outgoing
96 ///
97 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103 /// If it was rejected, the existence of streams opened and other application data sent prior
104 /// to the handshake completing will not be conveyed to the remote application, and local
105 /// operations on them will return `ZeroRttRejected` errors.
106 ///
107 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108 /// relevant resumption state to be stored in the server, which servers may limit or lose for
109 /// various reasons including not persisting resumption state across server restarts.
110 ///
111 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112 /// implementation's docs for 0-RTT pitfalls.
113 ///
114 /// ## Incoming
115 ///
116 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118 ///
119 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120 /// implementation's docs for 0-RTT pitfalls.
121 ///
122 /// ## Security
123 ///
124 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125 /// replay attacks, and should therefore never invoke non-idempotent operations.
126 ///
127 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128 /// before TLS client authentication has occurred, and should therefore not be used to send
129 /// data for which client authentication is being used.
130 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132 // have to release it explicitly before returning `self` by value.
133 let conn = match self.conn.as_mut() {
134 Some(conn) => conn.state.lock("into_0rtt"),
135 None => {
136 return Err(self);
137 }
138 };
139
140 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141 drop(conn);
142
143 if is_ok {
144 match self.conn.take() {
145 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146 None => {
147 tracing::error!("Connection state missing during 0-RTT acceptance");
148 Err(self)
149 }
150 }
151 } else {
152 Err(self)
153 }
154 }
155
156 /// Parameters negotiated during the handshake
157 ///
158 /// The dynamic type returned is determined by the configured
159 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160 /// be [`downcast`](Box::downcast) to a
161 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165 // simple.
166 if let Some(x) = self.handshake_data_ready.take() {
167 let _ = x.await;
168 }
169 let conn = self.conn.as_ref().ok_or_else(|| {
170 tracing::error!("Connection state missing while retrieving handshake data");
171 ConnectionError::LocallyClosed
172 })?;
173 let inner = conn.state.lock("handshake");
174 inner
175 .inner
176 .crypto_session()
177 .handshake_data()
178 .ok_or_else(|| {
179 inner.error.clone().unwrap_or_else(|| {
180 error!("Spurious handshake data ready notification with no error");
181 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182 "Spurious handshake notification".to_string(),
183 ))
184 })
185 })
186 }
187
188 /// The local IP address which was used when the peer established
189 /// the connection
190 ///
191 /// This can be different from the address the endpoint is bound to, in case
192 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193 ///
194 /// This will return `None` for clients, or when the platform does not expose this
195 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196 /// supported platforms when using quinn_udp for I/O, which is the default.
197 ///
198 /// Will panic if called after `poll` has returned `Ready`.
199 pub fn local_ip(&self) -> Option<IpAddr> {
200 let conn = self.conn.as_ref()?;
201 let inner = conn.state.lock("local_ip");
202
203 inner.inner.local_ip()
204 }
205
206 /// The peer's UDP address
207 ///
208 /// Returns an error if called after `poll` has returned `Ready`.
209 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211 error!("Connection used after yielding Ready");
212 ConnectionError::LocallyClosed
213 })?;
214 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215 }
216}
217
218impl Future for Connecting {
219 type Output = Result<Connection, ConnectionError>;
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221 Pin::new(&mut self.connected).poll(cx).map(|_| {
222 let conn = self.conn.take().ok_or_else(|| {
223 error!("Connection not available when connecting future resolves");
224 ConnectionError::LocallyClosed
225 })?;
226 let inner = conn.state.lock("connecting");
227 if inner.connected {
228 drop(inner);
229 Ok(Connection(conn))
230 } else {
231 Err(inner.error.clone().unwrap_or_else(|| {
232 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233 "connection failed without error".to_string(),
234 ))
235 }))
236 }
237 })
238 }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248 type Output = bool;
249 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251 }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269 type Output = Result<(), io::Error>;
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272 let conn = &mut *self.0.state.lock("poll");
273
274 let span = debug_span!("drive", id = conn.handle.0);
275 let _guard = span.enter();
276
277 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278 conn.terminate(e, &self.0.shared);
279 return Poll::Ready(Ok(()));
280 }
281 let mut keep_going = conn.drive_transmit(cx)?;
282 // If a timer expires, there might be more to transmit. When we transmit something, we
283 // might need to reset a timer. Hence, we must loop until neither happens.
284 keep_going |= conn.drive_timer(cx);
285 conn.forward_endpoint_events();
286 conn.forward_app_events(&self.0.shared);
287
288 // Kick off automatic channel binding once connected, if configured
289 if conn.connected && !conn.binding_started {
290 if let Some(rt) = crate::trust::global_runtime() {
291 // Delay NEW_TOKEN until binding completes
292 conn.inner.set_delay_new_token_until_binding(true);
293
294 let hl_conn_server = Connection(self.0.clone());
295 let hl_conn_client = hl_conn_server.clone();
296 let store = rt.store.clone();
297 let policy = rt.policy.clone();
298 let signer = rt.local_signing_key.clone();
299 let spki = rt.local_spki.clone();
300 let runtime = conn.runtime.clone();
301
302 if conn.inner.side().is_server() {
303 runtime.spawn(Box::pin(async move {
304 match crate::trust::recv_verify_binding_ed25519(
305 &hl_conn_server,
306 &*store,
307 &policy,
308 )
309 .await
310 {
311 Ok(peer) => {
312 let mismatch = {
313 let mut state = hl_conn_server.0.state.lock("trust set peer");
314 let inner = &mut state.inner;
315 let expected = inner.expected_peer_id();
316 let mismatch = expected.is_some() && expected != Some(peer);
317 if !mismatch {
318 inner.set_token_binding_peer_id(peer);
319 inner.set_delay_new_token_until_binding(false);
320 }
321 mismatch
322 };
323
324 if mismatch {
325 hl_conn_server.close(0u32.into(), b"token peer mismatch");
326 } else if let Err(_) =
327 crate::trust::persist_peer_id_global(Some(peer))
328 {
329 hl_conn_server.close(0u32.into(), b"peer persist failed");
330 }
331 }
332 Err(_e) => {
333 hl_conn_server.close(0u32.into(), b"channel binding failed");
334 }
335 }
336 }));
337 }
338
339 if conn.inner.side().is_client() {
340 runtime.spawn(Box::pin(async move {
341 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
342 let _ = crate::trust::send_binding_ed25519(
343 &hl_conn_client,
344 &exp,
345 &signer,
346 &spki,
347 )
348 .await;
349 }
350 }));
351 }
352
353 conn.binding_started = true;
354 }
355 }
356
357 if !conn.inner.is_drained() {
358 if keep_going {
359 // If the connection hasn't processed all tasks, schedule it again
360 cx.waker().wake_by_ref();
361 } else {
362 conn.driver = Some(cx.waker().clone());
363 }
364 return Poll::Pending;
365 }
366 if conn.error.is_none() {
367 unreachable!("drained connections always have an error");
368 }
369 Poll::Ready(Ok(()))
370 }
371}
372
373/// A QUIC connection.
374///
375/// If all references to a connection (including every clone of the `Connection` handle, streams of
376/// incoming streams, and the various stream types) have been dropped, then the connection will be
377/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
378/// connection explicitly by calling [`Connection::close()`].
379///
380/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
381/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
382/// application. [`Connection::close()`] describes in more detail how to gracefully close a
383/// connection without losing application data.
384///
385/// May be cloned to obtain another handle to the same connection.
386///
387/// [`Connection::close()`]: Connection::close
388#[derive(Debug, Clone)]
389pub struct Connection(ConnectionRef);
390
391impl Connection {
392 /// Initiate a new outgoing unidirectional stream.
393 ///
394 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
395 /// consequence, the peer won't be notified that a stream has been opened until the stream is
396 /// actually used.
397 pub fn open_uni(&self) -> OpenUni<'_> {
398 OpenUni {
399 conn: &self.0,
400 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
401 }
402 }
403
404 /// Initiate a new outgoing bidirectional stream.
405 ///
406 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
407 /// consequence, the peer won't be notified that a stream has been opened until the stream is
408 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
409 /// anything to [`SendStream`] will never succeed.
410 ///
411 /// [`open_bi()`]: Self::open_bi
412 /// [`SendStream`]: crate::SendStream
413 /// [`RecvStream`]: crate::RecvStream
414 pub fn open_bi(&self) -> OpenBi<'_> {
415 OpenBi {
416 conn: &self.0,
417 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
418 }
419 }
420
421 /// Accept the next incoming uni-directional stream
422 pub fn accept_uni(&self) -> AcceptUni<'_> {
423 AcceptUni {
424 conn: &self.0,
425 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
426 }
427 }
428
429 /// Accept the next incoming bidirectional stream
430 ///
431 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
432 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
433 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
434 ///
435 /// [`accept_bi()`]: Self::accept_bi
436 /// [`open_bi()`]: Self::open_bi
437 /// [`SendStream`]: crate::SendStream
438 /// [`RecvStream`]: crate::RecvStream
439 pub fn accept_bi(&self) -> AcceptBi<'_> {
440 AcceptBi {
441 conn: &self.0,
442 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
443 }
444 }
445
446 /// Receive an application datagram
447 pub fn read_datagram(&self) -> ReadDatagram<'_> {
448 ReadDatagram {
449 conn: &self.0,
450 notify: self.0.shared.datagram_received.notified(),
451 }
452 }
453
454 /// Wait for the connection to be closed for any reason
455 ///
456 /// Despite the return type's name, closed connections are often not an error condition at the
457 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
458 /// and [`ConnectionError::ApplicationClosed`].
459 pub async fn closed(&self) -> ConnectionError {
460 {
461 let conn = self.0.state.lock("closed");
462 if let Some(error) = conn.error.as_ref() {
463 return error.clone();
464 }
465 // Construct the future while the lock is held to ensure we can't miss a wakeup if
466 // the `Notify` is signaled immediately after we release the lock. `await` it after
467 // the lock guard is out of scope.
468 self.0.shared.closed.notified()
469 }
470 .await;
471 self.0
472 .state
473 .lock("closed")
474 .error
475 .as_ref()
476 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
477 .clone()
478 }
479
480 /// If the connection is closed, the reason why.
481 ///
482 /// Returns `None` if the connection is still open.
483 pub fn close_reason(&self) -> Option<ConnectionError> {
484 self.0.state.lock("close_reason").error.clone()
485 }
486
487 /// Close the connection immediately.
488 ///
489 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
490 /// more data is sent to the peer and the peer may drop buffered data upon receiving
491 /// the CONNECTION_CLOSE frame.
492 ///
493 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
494 ///
495 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
496 /// is preserved in full, it should be kept under 1KiB.
497 ///
498 /// # Gracefully closing a connection
499 ///
500 /// Only the peer last receiving application data can be certain that all data is
501 /// delivered. The only reliable action it can then take is to close the connection,
502 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
503 /// frame is very likely if both endpoints stay online long enough, and
504 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
505 /// remote peer will time out the connection, provided that the idle timeout is not
506 /// disabled.
507 ///
508 /// The sending side can not guarantee all stream data is delivered to the remote
509 /// application. It only knows the data is delivered to the QUIC stack of the remote
510 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
511 /// [`close()`] the remote endpoint may drop any data it received but is as yet
512 /// undelivered to the application, including data that was acknowledged as received to
513 /// the local endpoint.
514 ///
515 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
516 /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
517 /// [`close()`]: Connection::close
518 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
519 let conn = &mut *self.0.state.lock("close");
520 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
521 }
522
523 /// Transmit `data` as an unreliable, unordered application datagram
524 ///
525 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
526 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
527 /// dictated by the peer.
528 ///
529 /// Previously queued datagrams which are still unsent may be discarded to make space for this
530 /// datagram, in order of oldest to newest.
531 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
532 let conn = &mut *self.0.state.lock("send_datagram");
533 if let Some(ref x) = conn.error {
534 return Err(SendDatagramError::ConnectionLost(x.clone()));
535 }
536 use crate::SendDatagramError::*;
537 match conn.inner.datagrams().send(data, true) {
538 Ok(()) => {
539 conn.wake();
540 Ok(())
541 }
542 Err(e) => Err(match e {
543 Blocked(..) => unreachable!(),
544 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
545 Disabled => SendDatagramError::Disabled,
546 TooLarge => SendDatagramError::TooLarge,
547 }),
548 }
549 }
550
551 /// Transmit `data` as an unreliable, unordered application datagram
552 ///
553 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
554 /// conditions, which effectively prioritizes old datagrams over new datagrams.
555 ///
556 /// See [`send_datagram()`] for details.
557 ///
558 /// [`send_datagram()`]: Connection::send_datagram
559 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
560 SendDatagram {
561 conn: &self.0,
562 data: Some(data),
563 notify: self.0.shared.datagrams_unblocked.notified(),
564 }
565 }
566
567 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
568 ///
569 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
570 ///
571 /// This may change over the lifetime of a connection according to variation in the path MTU
572 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
573 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
574 ///
575 /// Not necessarily the maximum size of received datagrams.
576 ///
577 /// [`send_datagram()`]: Connection::send_datagram
578 pub fn max_datagram_size(&self) -> Option<usize> {
579 self.0
580 .state
581 .lock("max_datagram_size")
582 .inner
583 .datagrams()
584 .max_size()
585 }
586
587 /// Bytes available in the outgoing datagram buffer
588 ///
589 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
590 /// at most this size is guaranteed not to cause older datagrams to be dropped.
591 pub fn datagram_send_buffer_space(&self) -> usize {
592 self.0
593 .state
594 .lock("datagram_send_buffer_space")
595 .inner
596 .datagrams()
597 .send_buffer_space()
598 }
599
600 /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
601 pub fn send_nat_address_advertisement(
602 &self,
603 address: SocketAddr,
604 priority: u32,
605 ) -> Result<u64, crate::ConnectionError> {
606 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
607 conn.inner.send_nat_address_advertisement(address, priority)
608 }
609
610 /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
611 pub fn send_nat_punch_coordination(
612 &self,
613 paired_with_sequence_number: u64,
614 address: SocketAddr,
615 round: u32,
616 ) -> Result<(), crate::ConnectionError> {
617 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
618 conn.inner
619 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
620 }
621
622 /// The side of the connection (client or server)
623 pub fn side(&self) -> Side {
624 self.0.state.lock("side").inner.side()
625 }
626
627 /// The peer's UDP address
628 ///
629 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
630 /// switching to a cellular internet connection.
631 pub fn remote_address(&self) -> SocketAddr {
632 self.0.state.lock("remote_address").inner.remote_address()
633 }
634
635 /// The local IP address which was used when the peer established
636 /// the connection
637 ///
638 /// This can be different from the address the endpoint is bound to, in case
639 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
640 ///
641 /// This will return `None` for clients, or when the platform does not expose this
642 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
643 /// supported platforms when using quinn_udp for I/O, which is the default.
644 pub fn local_ip(&self) -> Option<IpAddr> {
645 self.0.state.lock("local_ip").inner.local_ip()
646 }
647
648 /// Current best estimate of this connection's latency (round-trip-time)
649 pub fn rtt(&self) -> Duration {
650 self.0.state.lock("rtt").inner.rtt()
651 }
652
653 /// Returns connection statistics
654 pub fn stats(&self) -> ConnectionStats {
655 self.0.state.lock("stats").inner.stats()
656 }
657
658 /// Current state of the congestion control algorithm, for debugging purposes
659 pub fn congestion_state(&self) -> Box<dyn Controller> {
660 self.0
661 .state
662 .lock("congestion_state")
663 .inner
664 .congestion_state()
665 .clone_box()
666 }
667
668 /// Parameters negotiated during the handshake
669 ///
670 /// Guaranteed to return `Some` on fully established connections or after
671 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
672 /// the returned value.
673 ///
674 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
675 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
676 self.0
677 .state
678 .lock("handshake_data")
679 .inner
680 .crypto_session()
681 .handshake_data()
682 }
683
684 /// Cryptographic identity of the peer
685 ///
686 /// The dynamic type returned is determined by the configured
687 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
688 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
689 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
690 self.0
691 .state
692 .lock("peer_identity")
693 .inner
694 .crypto_session()
695 .peer_identity()
696 }
697
698 /// A stable identifier for this connection
699 ///
700 /// Peer addresses and connection IDs can change, but this value will remain
701 /// fixed for the lifetime of the connection.
702 pub fn stable_id(&self) -> usize {
703 self.0.stable_id()
704 }
705
706 /// Returns true if this connection negotiated post-quantum settings.
707 ///
708 /// This reflects either explicit PQC algorithms advertised via transport
709 /// parameters or in-band detection from handshake CRYPTO frames.
710 pub fn is_pqc(&self) -> bool {
711 let state = self.0.state.lock("is_pqc");
712 state.inner.is_pqc()
713 }
714
715 /// Debug-only hint: returns true when the underlying TLS provider was
716 /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
717 /// for tests and does not itself guarantee group enforcement.
718 pub fn debug_kem_only(&self) -> bool {
719 #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
720 {
721 crate::crypto::rustls::debug_kem_only_enabled()
722 }
723 #[cfg(not(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring")))]
724 {
725 false
726 }
727 }
728
729 /// Update traffic keys spontaneously
730 ///
731 /// This primarily exists for testing purposes.
732 pub fn force_key_update(&self) {
733 self.0
734 .state
735 .lock("force_key_update")
736 .inner
737 .force_key_update()
738 }
739
740 /// Derive keying material from this connection's TLS session secrets.
741 ///
742 /// When both peers call this method with the same `label` and `context`
743 /// arguments and `output` buffers of equal length, they will get the
744 /// same sequence of bytes in `output`. These bytes are cryptographically
745 /// strong and pseudorandom, and are suitable for use as keying material.
746 ///
747 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
748 pub fn export_keying_material(
749 &self,
750 output: &mut [u8],
751 label: &[u8],
752 context: &[u8],
753 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
754 self.0
755 .state
756 .lock("export_keying_material")
757 .inner
758 .crypto_session()
759 .export_keying_material(output, label, context)
760 }
761
762 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
763 ///
764 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
765 /// `count`s increase both minimum and worst-case memory consumption.
766 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
767 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
768 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
769 // May need to send MAX_STREAMS to make progress
770 conn.wake();
771 }
772
773 /// See [`crate::TransportConfig::receive_window()`]
774 pub fn set_receive_window(&self, receive_window: VarInt) {
775 let mut conn = self.0.state.lock("set_receive_window");
776 conn.inner.set_receive_window(receive_window);
777 conn.wake();
778 }
779
780 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
781 ///
782 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
783 /// `count`s increase both minimum and worst-case memory consumption.
784 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
785 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
786 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
787 // May need to send MAX_STREAMS to make progress
788 conn.wake();
789 }
790
791 /// Set up qlog for this connection.
792 #[cfg(feature = "__qlog")]
793 pub fn set_qlog(
794 &mut self,
795 writer: Box<dyn std::io::Write + Send + Sync>,
796 title: Option<String>,
797 description: Option<String>,
798 ) {
799 let mut state = self.0.state.lock("__qlog");
800 state
801 .inner
802 .set_qlog(writer, title, description, Instant::now());
803 }
804}
805
806pin_project! {
807 /// Future produced by [`Connection::open_uni`]
808 pub struct OpenUni<'a> {
809 conn: &'a ConnectionRef,
810 #[pin]
811 notify: Notified<'a>,
812 }
813}
814
815impl Future for OpenUni<'_> {
816 type Output = Result<SendStream, ConnectionError>;
817 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
818 let this = self.project();
819 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
820 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
821 }
822}
823
824pin_project! {
825 /// Future produced by [`Connection::open_bi`]
826 pub struct OpenBi<'a> {
827 conn: &'a ConnectionRef,
828 #[pin]
829 notify: Notified<'a>,
830 }
831}
832
833impl Future for OpenBi<'_> {
834 type Output = Result<(SendStream, RecvStream), ConnectionError>;
835 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
836 let this = self.project();
837 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
838
839 Poll::Ready(Ok((
840 SendStream::new(conn.clone(), id, is_0rtt),
841 RecvStream::new(conn, id, is_0rtt),
842 )))
843 }
844}
845
846fn poll_open<'a>(
847 ctx: &mut Context<'_>,
848 conn: &'a ConnectionRef,
849 mut notify: Pin<&mut Notified<'a>>,
850 dir: Dir,
851) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
852 let mut state = conn.state.lock("poll_open");
853 if let Some(ref e) = state.error {
854 return Poll::Ready(Err(e.clone()));
855 } else if let Some(id) = state.inner.streams().open(dir) {
856 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
857 drop(state); // Release the lock so clone can take it
858 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
859 }
860 loop {
861 match notify.as_mut().poll(ctx) {
862 // `state` lock ensures we didn't race with readiness
863 Poll::Pending => return Poll::Pending,
864 // Spurious wakeup, get a new future
865 Poll::Ready(()) => {
866 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
867 }
868 }
869 }
870}
871
872pin_project! {
873 /// Future produced by [`Connection::accept_uni`]
874 pub struct AcceptUni<'a> {
875 conn: &'a ConnectionRef,
876 #[pin]
877 notify: Notified<'a>,
878 }
879}
880
881impl Future for AcceptUni<'_> {
882 type Output = Result<RecvStream, ConnectionError>;
883
884 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
885 let this = self.project();
886 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
887 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
888 }
889}
890
891pin_project! {
892 /// Future produced by [`Connection::accept_bi`]
893 pub struct AcceptBi<'a> {
894 conn: &'a ConnectionRef,
895 #[pin]
896 notify: Notified<'a>,
897 }
898}
899
900impl Future for AcceptBi<'_> {
901 type Output = Result<(SendStream, RecvStream), ConnectionError>;
902
903 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
904 let this = self.project();
905 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
906 Poll::Ready(Ok((
907 SendStream::new(conn.clone(), id, is_0rtt),
908 RecvStream::new(conn, id, is_0rtt),
909 )))
910 }
911}
912
913fn poll_accept<'a>(
914 ctx: &mut Context<'_>,
915 conn: &'a ConnectionRef,
916 mut notify: Pin<&mut Notified<'a>>,
917 dir: Dir,
918) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
919 let mut state = conn.state.lock("poll_accept");
920 // Check for incoming streams before checking `state.error` so that already-received streams,
921 // which are necessarily finite, can be drained from a closed connection.
922 if let Some(id) = state.inner.streams().accept(dir) {
923 let is_0rtt = state.inner.is_handshaking();
924 state.wake(); // To send additional stream ID credit
925 drop(state); // Release the lock so clone can take it
926 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
927 } else if let Some(ref e) = state.error {
928 return Poll::Ready(Err(e.clone()));
929 }
930 loop {
931 match notify.as_mut().poll(ctx) {
932 // `state` lock ensures we didn't race with readiness
933 Poll::Pending => return Poll::Pending,
934 // Spurious wakeup, get a new future
935 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
936 }
937 }
938}
939
940pin_project! {
941 /// Future produced by [`Connection::read_datagram`]
942 pub struct ReadDatagram<'a> {
943 conn: &'a ConnectionRef,
944 #[pin]
945 notify: Notified<'a>,
946 }
947}
948
949impl Future for ReadDatagram<'_> {
950 type Output = Result<Bytes, ConnectionError>;
951 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
952 let mut this = self.project();
953 let mut state = this.conn.state.lock("ReadDatagram::poll");
954 // Check for buffered datagrams before checking `state.error` so that already-received
955 // datagrams, which are necessarily finite, can be drained from a closed connection.
956 match state.inner.datagrams().recv() {
957 Some(x) => {
958 return Poll::Ready(Ok(x));
959 }
960 _ => {
961 if let Some(ref e) = state.error {
962 return Poll::Ready(Err(e.clone()));
963 }
964 }
965 }
966 loop {
967 match this.notify.as_mut().poll(ctx) {
968 // `state` lock ensures we didn't race with readiness
969 Poll::Pending => return Poll::Pending,
970 // Spurious wakeup, get a new future
971 Poll::Ready(()) => this
972 .notify
973 .set(this.conn.shared.datagram_received.notified()),
974 }
975 }
976 }
977}
978
979pin_project! {
980 /// Future produced by [`Connection::send_datagram_wait`]
981 pub struct SendDatagram<'a> {
982 conn: &'a ConnectionRef,
983 data: Option<Bytes>,
984 #[pin]
985 notify: Notified<'a>,
986 }
987}
988
989impl Future for SendDatagram<'_> {
990 type Output = Result<(), SendDatagramError>;
991 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
992 let mut this = self.project();
993 let mut state = this.conn.state.lock("SendDatagram::poll");
994 if let Some(ref e) = state.error {
995 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
996 }
997 use crate::SendDatagramError::*;
998 match state.inner.datagrams().send(
999 this.data.take().ok_or_else(|| {
1000 error!("SendDatagram future polled without data");
1001 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1002 })?,
1003 false,
1004 ) {
1005 Ok(()) => {
1006 state.wake();
1007 Poll::Ready(Ok(()))
1008 }
1009 Err(e) => Poll::Ready(Err(match e {
1010 Blocked(data) => {
1011 this.data.replace(data);
1012 loop {
1013 match this.notify.as_mut().poll(ctx) {
1014 Poll::Pending => return Poll::Pending,
1015 // Spurious wakeup, get a new future
1016 Poll::Ready(()) => this
1017 .notify
1018 .set(this.conn.shared.datagrams_unblocked.notified()),
1019 }
1020 }
1021 }
1022 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1023 Disabled => SendDatagramError::Disabled,
1024 TooLarge => SendDatagramError::TooLarge,
1025 })),
1026 }
1027 }
1028}
1029
1030#[derive(Debug)]
1031pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1032
1033impl ConnectionRef {
1034 #[allow(clippy::too_many_arguments)]
1035 fn new(
1036 handle: ConnectionHandle,
1037 conn: crate::Connection,
1038 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1039 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1040 on_handshake_data: oneshot::Sender<()>,
1041 on_connected: oneshot::Sender<bool>,
1042 socket: Arc<dyn AsyncUdpSocket>,
1043 runtime: Arc<dyn Runtime>,
1044 ) -> Self {
1045 Self(Arc::new(ConnectionInner {
1046 state: Mutex::new(State {
1047 inner: conn,
1048 driver: None,
1049 handle,
1050 on_handshake_data: Some(on_handshake_data),
1051 on_connected: Some(on_connected),
1052 connected: false,
1053 timer: None,
1054 timer_deadline: None,
1055 conn_events,
1056 endpoint_events,
1057 blocked_writers: FxHashMap::default(),
1058 blocked_readers: FxHashMap::default(),
1059 stopped: FxHashMap::default(),
1060 error: None,
1061 ref_count: 0,
1062 io_poller: socket.clone().create_io_poller(),
1063 socket,
1064 runtime,
1065 send_buffer: Vec::new(),
1066 buffered_transmit: None,
1067 binding_started: false,
1068 }),
1069 shared: Shared::default(),
1070 }))
1071 }
1072
1073 fn stable_id(&self) -> usize {
1074 &*self.0 as *const _ as usize
1075 }
1076}
1077
1078impl Clone for ConnectionRef {
1079 fn clone(&self) -> Self {
1080 self.state.lock("clone").ref_count += 1;
1081 Self(self.0.clone())
1082 }
1083}
1084
1085impl Drop for ConnectionRef {
1086 fn drop(&mut self) {
1087 let conn = &mut *self.state.lock("drop");
1088 if let Some(x) = conn.ref_count.checked_sub(1) {
1089 conn.ref_count = x;
1090 if x == 0 && !conn.inner.is_closed() {
1091 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1092 // not, we can't do any harm. If there were any streams being opened, then either
1093 // the connection will be closed for an unrelated reason or a fresh reference will
1094 // be constructed for the newly opened stream.
1095 conn.implicit_close(&self.shared);
1096 }
1097 }
1098 }
1099}
1100
1101impl std::ops::Deref for ConnectionRef {
1102 type Target = ConnectionInner;
1103 fn deref(&self) -> &Self::Target {
1104 &self.0
1105 }
1106}
1107
1108#[derive(Debug)]
1109pub(crate) struct ConnectionInner {
1110 pub(crate) state: Mutex<State>,
1111 pub(crate) shared: Shared,
1112}
1113
1114#[derive(Debug, Default)]
1115pub(crate) struct Shared {
1116 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1117 /// control budget
1118 stream_budget_available: [Notify; 2],
1119 /// Notified when the peer has initiated a new stream
1120 stream_incoming: [Notify; 2],
1121 datagram_received: Notify,
1122 datagrams_unblocked: Notify,
1123 closed: Notify,
1124}
1125
1126pub(crate) struct State {
1127 pub(crate) inner: crate::Connection,
1128 driver: Option<Waker>,
1129 handle: ConnectionHandle,
1130 on_handshake_data: Option<oneshot::Sender<()>>,
1131 on_connected: Option<oneshot::Sender<bool>>,
1132 connected: bool,
1133 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1134 timer_deadline: Option<Instant>,
1135 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1136 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1137 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1138 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1139 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1140 /// Always set to Some before the connection becomes drained
1141 pub(crate) error: Option<ConnectionError>,
1142 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1143 ref_count: usize,
1144 socket: Arc<dyn AsyncUdpSocket>,
1145 io_poller: Pin<Box<dyn UdpPoller>>,
1146 runtime: Arc<dyn Runtime>,
1147 send_buffer: Vec<u8>,
1148 /// We buffer a transmit when the underlying I/O would block
1149 buffered_transmit: Option<crate::Transmit>,
1150 /// True once we've initiated automatic channel binding (if enabled)
1151 binding_started: bool,
1152}
1153
1154impl State {
1155 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1156 let now = self.runtime.now();
1157 let mut transmits = 0;
1158
1159 let max_datagrams = self
1160 .socket
1161 .max_transmit_segments()
1162 .min(MAX_TRANSMIT_SEGMENTS);
1163
1164 loop {
1165 // Retry the last transmit, or get a new one.
1166 let t = match self.buffered_transmit.take() {
1167 Some(t) => t,
1168 None => {
1169 self.send_buffer.clear();
1170 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1171 match self
1172 .inner
1173 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1174 {
1175 Some(t) => {
1176 transmits += match t.segment_size {
1177 None => 1,
1178 Some(s) => t.size.div_ceil(s), // round up
1179 };
1180 t
1181 }
1182 None => break,
1183 }
1184 }
1185 };
1186
1187 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1188 // Retry after a future wakeup
1189 self.buffered_transmit = Some(t);
1190 return Ok(false);
1191 }
1192
1193 let len = t.size;
1194 let retry = match self
1195 .socket
1196 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1197 {
1198 Ok(()) => false,
1199 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1200 Err(e) => return Err(e),
1201 };
1202 if retry {
1203 // We thought the socket was writable, but it wasn't. Retry so that either another
1204 // `poll_writable` call determines that the socket is indeed not writable and
1205 // registers us for a wakeup, or the send succeeds if this really was just a
1206 // transient failure.
1207 self.buffered_transmit = Some(t);
1208 continue;
1209 }
1210
1211 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1212 // TODO: What isn't ideal here yet is that if we don't poll all
1213 // datagrams that could be sent we don't go into the `app_limited`
1214 // state and CWND continues to grow until we get here the next time.
1215 // See https://github.com/quinn-rs/quinn/issues/1126
1216 return Ok(true);
1217 }
1218 }
1219
1220 Ok(false)
1221 }
1222
1223 fn forward_endpoint_events(&mut self) {
1224 while let Some(event) = self.inner.poll_endpoint_events() {
1225 // If the endpoint driver is gone, noop.
1226 let _ = self.endpoint_events.send((self.handle, event));
1227 }
1228 }
1229
1230 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1231 fn process_conn_events(
1232 &mut self,
1233 shared: &Shared,
1234 cx: &mut Context,
1235 ) -> Result<(), ConnectionError> {
1236 loop {
1237 match self.conn_events.poll_recv(cx) {
1238 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1239 self.socket = socket;
1240 self.io_poller = self.socket.clone().create_io_poller();
1241 self.inner.local_address_changed();
1242 }
1243 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1244 self.inner.handle_event(event);
1245 }
1246 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1247 self.close(error_code, reason, shared);
1248 }
1249 Poll::Ready(None) => {
1250 return Err(ConnectionError::TransportError(crate::TransportError {
1251 code: crate::TransportErrorCode::INTERNAL_ERROR,
1252 frame: None,
1253 reason: "endpoint driver future was dropped".to_string(),
1254 }));
1255 }
1256 Poll::Pending => {
1257 return Ok(());
1258 }
1259 }
1260 }
1261 }
1262
1263 fn forward_app_events(&mut self, shared: &Shared) {
1264 while let Some(event) = self.inner.poll() {
1265 use crate::Event::*;
1266 match event {
1267 HandshakeDataReady => {
1268 if let Some(x) = self.on_handshake_data.take() {
1269 let _ = x.send(());
1270 }
1271 }
1272 Connected => {
1273 self.connected = true;
1274 if let Some(x) = self.on_connected.take() {
1275 // We don't care if the on-connected future was dropped
1276 let _ = x.send(self.inner.accepted_0rtt());
1277 }
1278 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1279 // Wake up rejected 0-RTT streams so they can fail immediately with
1280 // `ZeroRttRejected` errors.
1281 wake_all(&mut self.blocked_writers);
1282 wake_all(&mut self.blocked_readers);
1283 wake_all_notify(&mut self.stopped);
1284 }
1285 }
1286 ConnectionLost { reason } => {
1287 self.terminate(reason, shared);
1288 }
1289 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1290 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1291 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1292 }
1293 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1294 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1295 }
1296 DatagramReceived => {
1297 shared.datagram_received.notify_waiters();
1298 }
1299 DatagramsUnblocked => {
1300 shared.datagrams_unblocked.notify_waiters();
1301 }
1302 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1303 Stream(StreamEvent::Available { dir }) => {
1304 // Might mean any number of streams are ready, so we wake up everyone
1305 shared.stream_budget_available[dir as usize].notify_waiters();
1306 }
1307 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1308 Stream(StreamEvent::Stopped { id, .. }) => {
1309 wake_stream_notify(id, &mut self.stopped);
1310 wake_stream(id, &mut self.blocked_writers);
1311 }
1312 }
1313 }
1314 }
1315
1316 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1317 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1318 // timer is registered with the runtime (and check whether it's already
1319 // expired).
1320 match self.inner.poll_timeout() {
1321 Some(deadline) => {
1322 if let Some(delay) = &mut self.timer {
1323 // There is no need to reset the tokio timer if the deadline
1324 // did not change
1325 if self
1326 .timer_deadline
1327 .map(|current_deadline| current_deadline != deadline)
1328 .unwrap_or(true)
1329 {
1330 delay.as_mut().reset(deadline);
1331 }
1332 } else {
1333 self.timer = Some(self.runtime.new_timer(deadline));
1334 }
1335 // Store the actual expiration time of the timer
1336 self.timer_deadline = Some(deadline);
1337 }
1338 None => {
1339 self.timer_deadline = None;
1340 return false;
1341 }
1342 }
1343
1344 if self.timer_deadline.is_none() {
1345 return false;
1346 }
1347
1348 let delay = match self.timer.as_mut() {
1349 Some(timer) => timer.as_mut(),
1350 None => {
1351 error!("Timer missing in state where it should exist");
1352 return false;
1353 }
1354 };
1355 if delay.poll(cx).is_pending() {
1356 // Since there wasn't a timeout event, there is nothing new
1357 // for the connection to do
1358 return false;
1359 }
1360
1361 // A timer expired, so the caller needs to check for
1362 // new transmits, which might cause new timers to be set.
1363 self.inner.handle_timeout(self.runtime.now());
1364 self.timer_deadline = None;
1365 true
1366 }
1367
1368 /// Wake up a blocked `Driver` task to process I/O
1369 pub(crate) fn wake(&mut self) {
1370 if let Some(x) = self.driver.take() {
1371 x.wake();
1372 }
1373 }
1374
1375 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1376 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1377 self.error = Some(reason.clone());
1378 if let Some(x) = self.on_handshake_data.take() {
1379 let _ = x.send(());
1380 }
1381 wake_all(&mut self.blocked_writers);
1382 wake_all(&mut self.blocked_readers);
1383 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1384 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1385 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1386 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1387 shared.datagram_received.notify_waiters();
1388 shared.datagrams_unblocked.notify_waiters();
1389 if let Some(x) = self.on_connected.take() {
1390 let _ = x.send(false);
1391 }
1392 wake_all_notify(&mut self.stopped);
1393 shared.closed.notify_waiters();
1394 }
1395
1396 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1397 self.inner.close(self.runtime.now(), error_code, reason);
1398 self.terminate(ConnectionError::LocallyClosed, shared);
1399 self.wake();
1400 }
1401
1402 /// Close for a reason other than the application's explicit request
1403 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1404 self.close(0u32.into(), Bytes::new(), shared);
1405 }
1406
1407 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1408 if self.inner.is_handshaking()
1409 || self.inner.accepted_0rtt()
1410 || self.inner.side().is_server()
1411 {
1412 Ok(())
1413 } else {
1414 Err(())
1415 }
1416 }
1417}
1418
1419impl Drop for State {
1420 fn drop(&mut self) {
1421 if !self.inner.is_drained() {
1422 // Ensure the endpoint can tidy up
1423 let _ = self
1424 .endpoint_events
1425 .send((self.handle, crate::EndpointEvent::drained()));
1426 }
1427 }
1428}
1429
1430impl fmt::Debug for State {
1431 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1432 f.debug_struct("State").field("inner", &self.inner).finish()
1433 }
1434}
1435
1436fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1437 if let Some(waker) = wakers.remove(&stream_id) {
1438 waker.wake();
1439 }
1440}
1441
1442fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1443 wakers.drain().for_each(|(_, waker)| waker.wake())
1444}
1445
1446fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1447 if let Some(notify) = wakers.remove(&stream_id) {
1448 notify.notify_waiters()
1449 }
1450}
1451
1452fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1453 wakers
1454 .drain()
1455 .for_each(|(_, notify)| notify.notify_waiters())
1456}
1457
1458/// Errors that can arise when sending a datagram
1459#[derive(Debug, Error, Clone, Eq, PartialEq)]
1460pub enum SendDatagramError {
1461 /// The peer does not support receiving datagram frames
1462 #[error("datagrams not supported by peer")]
1463 UnsupportedByPeer,
1464 /// Datagram support is disabled locally
1465 #[error("datagram support disabled")]
1466 Disabled,
1467 /// The datagram is larger than the connection can currently accommodate
1468 ///
1469 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1470 /// exceeded.
1471 #[error("datagram too large")]
1472 TooLarge,
1473 /// The connection was lost
1474 #[error("connection lost")]
1475 ConnectionLost(#[from] ConnectionError),
1476}
1477
1478/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1479///
1480/// This limits the amount of CPU resources consumed by datagram generation,
1481/// and allows other tasks (like receiving ACKs) to run in between.
1482const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1483
1484/// The maximum amount of datagrams that are sent in a single transmit
1485///
1486/// This can be lower than the maximum platform capabilities, to avoid excessive
1487/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1488/// that numbers around 10 are a good compromise.
1489const MAX_TRANSMIT_SEGMENTS: usize = 10;