ant_quic/high_level/connection.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9 any::Any,
10 fmt,
11 future::Future,
12 io,
13 net::{IpAddr, SocketAddr},
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27 ConnectionEvent,
28 mutex::Mutex,
29 recv_stream::RecvStream,
30 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31 send_stream::SendStream,
32 udp_transmit,
33};
34use crate::{
35 ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36 Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42 conn: Option<ConnectionRef>,
43 connected: oneshot::Receiver<bool>,
44 handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48 pub(crate) fn new(
49 handle: ConnectionHandle,
50 conn: crate::Connection,
51 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53 socket: Arc<dyn AsyncUdpSocket>,
54 runtime: Arc<dyn Runtime>,
55 ) -> Self {
56 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57 let (on_connected_send, on_connected_recv) = oneshot::channel();
58 let conn = ConnectionRef::new(
59 handle,
60 conn,
61 endpoint_events,
62 conn_events,
63 on_handshake_data_send,
64 on_connected_send,
65 socket,
66 runtime.clone(),
67 );
68
69 let driver = ConnectionDriver(conn.clone());
70 runtime.spawn(Box::pin(
71 async {
72 if let Err(e) = driver.await {
73 tracing::error!("I/O error: {e}");
74 }
75 }
76 .instrument(Span::current()),
77 ));
78
79 Self {
80 conn: Some(conn),
81 connected: on_connected_recv,
82 handshake_data_ready: Some(on_handshake_data_recv),
83 }
84 }
85
86 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87 ///
88 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89 /// If so, the returned [`Connection`] can be used to send application data without waiting for
90 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92 /// complete, at which point subsequently opened streams and written data will have full
93 /// cryptographic protection.
94 ///
95 /// ## Outgoing
96 ///
97 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103 /// If it was rejected, the existence of streams opened and other application data sent prior
104 /// to the handshake completing will not be conveyed to the remote application, and local
105 /// operations on them will return `ZeroRttRejected` errors.
106 ///
107 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108 /// relevant resumption state to be stored in the server, which servers may limit or lose for
109 /// various reasons including not persisting resumption state across server restarts.
110 ///
111 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112 /// implementation's docs for 0-RTT pitfalls.
113 ///
114 /// ## Incoming
115 ///
116 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118 ///
119 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120 /// implementation's docs for 0-RTT pitfalls.
121 ///
122 /// ## Security
123 ///
124 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125 /// replay attacks, and should therefore never invoke non-idempotent operations.
126 ///
127 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128 /// before TLS client authentication has occurred, and should therefore not be used to send
129 /// data for which client authentication is being used.
130 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132 // have to release it explicitly before returning `self` by value.
133 let conn = match self.conn.as_mut() {
134 Some(conn) => conn.state.lock("into_0rtt"),
135 None => {
136 return Err(self);
137 }
138 };
139
140 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141 drop(conn);
142
143 if is_ok {
144 match self.conn.take() {
145 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146 None => {
147 tracing::error!("Connection state missing during 0-RTT acceptance");
148 Err(self)
149 }
150 }
151 } else {
152 Err(self)
153 }
154 }
155
156 /// Parameters negotiated during the handshake
157 ///
158 /// The dynamic type returned is determined by the configured
159 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160 /// be [`downcast`](Box::downcast) to a
161 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165 // simple.
166 if let Some(x) = self.handshake_data_ready.take() {
167 let _ = x.await;
168 }
169 let conn = self.conn.as_ref().ok_or_else(|| {
170 tracing::error!("Connection state missing while retrieving handshake data");
171 ConnectionError::LocallyClosed
172 })?;
173 let inner = conn.state.lock("handshake");
174 inner
175 .inner
176 .crypto_session()
177 .handshake_data()
178 .ok_or_else(|| {
179 inner.error.clone().unwrap_or_else(|| {
180 error!("Spurious handshake data ready notification with no error");
181 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182 "Spurious handshake notification".to_string(),
183 ))
184 })
185 })
186 }
187
188 /// The local IP address which was used when the peer established
189 /// the connection
190 ///
191 /// This can be different from the address the endpoint is bound to, in case
192 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193 ///
194 /// This will return `None` for clients, or when the platform does not expose this
195 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196 /// supported platforms when using quinn_udp for I/O, which is the default.
197 ///
198 /// Will panic if called after `poll` has returned `Ready`.
199 pub fn local_ip(&self) -> Option<IpAddr> {
200 let conn = self.conn.as_ref()?;
201 let inner = conn.state.lock("local_ip");
202
203 inner.inner.local_ip()
204 }
205
206 /// The peer's UDP address
207 ///
208 /// Returns an error if called after `poll` has returned `Ready`.
209 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211 error!("Connection used after yielding Ready");
212 ConnectionError::LocallyClosed
213 })?;
214 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215 }
216}
217
218impl Future for Connecting {
219 type Output = Result<Connection, ConnectionError>;
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221 Pin::new(&mut self.connected).poll(cx).map(|_| {
222 let conn = self.conn.take().ok_or_else(|| {
223 error!("Connection not available when connecting future resolves");
224 ConnectionError::LocallyClosed
225 })?;
226 let inner = conn.state.lock("connecting");
227 if inner.connected {
228 drop(inner);
229 Ok(Connection(conn))
230 } else {
231 Err(inner.error.clone().unwrap_or_else(|| {
232 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233 "connection failed without error".to_string(),
234 ))
235 }))
236 }
237 })
238 }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248 type Output = bool;
249 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251 }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269 type Output = Result<(), io::Error>;
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272 let conn = &mut *self.0.state.lock("poll");
273
274 let span = debug_span!("drive", id = conn.handle.0);
275 let _guard = span.enter();
276
277 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278 conn.terminate(e, &self.0.shared);
279 return Poll::Ready(Ok(()));
280 }
281 let mut keep_going = conn.drive_transmit(cx)?;
282 // If a timer expires, there might be more to transmit. When we transmit something, we
283 // might need to reset a timer. Hence, we must loop until neither happens.
284 keep_going |= conn.drive_timer(cx);
285 conn.forward_endpoint_events();
286 conn.forward_app_events(&self.0.shared);
287
288 // Kick off automatic channel binding once connected, if configured
289 if conn.connected && !conn.binding_started {
290 if let Some(rt) = crate::trust::global_runtime() {
291 // Delay NEW_TOKEN until binding completes
292 conn.inner.set_delay_new_token_until_binding(true);
293
294 let hl_conn_server = Connection(self.0.clone());
295 let hl_conn_client = hl_conn_server.clone();
296 let store = rt.store.clone();
297 let policy = rt.policy.clone();
298 let signer = rt.local_secret_key.clone();
299 let spki = rt.local_spki.clone();
300 let runtime = conn.runtime.clone();
301
302 if conn.inner.side().is_server() {
303 runtime.spawn(Box::pin(async move {
304 match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
305 .await
306 {
307 Ok(peer) => {
308 hl_conn_server
309 .0
310 .state
311 .lock("set peer")
312 .inner
313 .set_token_binding_peer_id(peer);
314 hl_conn_server
315 .0
316 .state
317 .lock("allow tokens")
318 .inner
319 .set_delay_new_token_until_binding(false);
320 }
321 Err(_e) => {
322 hl_conn_server.close(0u32.into(), b"channel binding failed");
323 }
324 }
325 }));
326 }
327
328 if conn.inner.side().is_client() {
329 runtime.spawn(Box::pin(async move {
330 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
331 let _ =
332 crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
333 .await;
334 }
335 }));
336 }
337
338 conn.binding_started = true;
339 }
340 }
341
342 if !conn.inner.is_drained() {
343 if keep_going {
344 // If the connection hasn't processed all tasks, schedule it again
345 cx.waker().wake_by_ref();
346 } else {
347 conn.driver = Some(cx.waker().clone());
348 }
349 return Poll::Pending;
350 }
351 if conn.error.is_none() {
352 unreachable!("drained connections always have an error");
353 }
354 Poll::Ready(Ok(()))
355 }
356}
357
358/// A QUIC connection.
359///
360/// If all references to a connection (including every clone of the `Connection` handle, streams of
361/// incoming streams, and the various stream types) have been dropped, then the connection will be
362/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
363/// connection explicitly by calling [`Connection::close()`].
364///
365/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
366/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
367/// application. [`Connection::close()`] describes in more detail how to gracefully close a
368/// connection without losing application data.
369///
370/// May be cloned to obtain another handle to the same connection.
371///
372/// [`Connection::close()`]: Connection::close
373#[derive(Debug, Clone)]
374pub struct Connection(ConnectionRef);
375
376impl Connection {
377 /// Initiate a new outgoing unidirectional stream.
378 ///
379 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
380 /// consequence, the peer won't be notified that a stream has been opened until the stream is
381 /// actually used.
382 pub fn open_uni(&self) -> OpenUni<'_> {
383 OpenUni {
384 conn: &self.0,
385 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
386 }
387 }
388
389 /// Initiate a new outgoing bidirectional stream.
390 ///
391 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
392 /// consequence, the peer won't be notified that a stream has been opened until the stream is
393 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
394 /// anything to [`SendStream`] will never succeed.
395 ///
396 /// [`open_bi()`]: Self::open_bi
397 /// [`SendStream`]: crate::SendStream
398 /// [`RecvStream`]: crate::RecvStream
399 pub fn open_bi(&self) -> OpenBi<'_> {
400 OpenBi {
401 conn: &self.0,
402 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
403 }
404 }
405
406 /// Accept the next incoming uni-directional stream
407 pub fn accept_uni(&self) -> AcceptUni<'_> {
408 AcceptUni {
409 conn: &self.0,
410 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
411 }
412 }
413
414 /// Accept the next incoming bidirectional stream
415 ///
416 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
417 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
418 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
419 ///
420 /// [`accept_bi()`]: Self::accept_bi
421 /// [`open_bi()`]: Self::open_bi
422 /// [`SendStream`]: crate::SendStream
423 /// [`RecvStream`]: crate::RecvStream
424 pub fn accept_bi(&self) -> AcceptBi<'_> {
425 AcceptBi {
426 conn: &self.0,
427 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
428 }
429 }
430
431 /// Receive an application datagram
432 pub fn read_datagram(&self) -> ReadDatagram<'_> {
433 ReadDatagram {
434 conn: &self.0,
435 notify: self.0.shared.datagram_received.notified(),
436 }
437 }
438
439 /// Wait for the connection to be closed for any reason
440 ///
441 /// Despite the return type's name, closed connections are often not an error condition at the
442 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
443 /// and [`ConnectionError::ApplicationClosed`].
444 pub async fn closed(&self) -> ConnectionError {
445 {
446 let conn = self.0.state.lock("closed");
447 if let Some(error) = conn.error.as_ref() {
448 return error.clone();
449 }
450 // Construct the future while the lock is held to ensure we can't miss a wakeup if
451 // the `Notify` is signaled immediately after we release the lock. `await` it after
452 // the lock guard is out of scope.
453 self.0.shared.closed.notified()
454 }
455 .await;
456 self.0
457 .state
458 .lock("closed")
459 .error
460 .as_ref()
461 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
462 .clone()
463 }
464
465 /// If the connection is closed, the reason why.
466 ///
467 /// Returns `None` if the connection is still open.
468 pub fn close_reason(&self) -> Option<ConnectionError> {
469 self.0.state.lock("close_reason").error.clone()
470 }
471
472 /// Close the connection immediately.
473 ///
474 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
475 /// more data is sent to the peer and the peer may drop buffered data upon receiving
476 /// the CONNECTION_CLOSE frame.
477 ///
478 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
479 ///
480 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
481 /// is preserved in full, it should be kept under 1KiB.
482 ///
483 /// # Gracefully closing a connection
484 ///
485 /// Only the peer last receiving application data can be certain that all data is
486 /// delivered. The only reliable action it can then take is to close the connection,
487 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
488 /// frame is very likely if both endpoints stay online long enough, and
489 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
490 /// remote peer will time out the connection, provided that the idle timeout is not
491 /// disabled.
492 ///
493 /// The sending side can not guarantee all stream data is delivered to the remote
494 /// application. It only knows the data is delivered to the QUIC stack of the remote
495 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
496 /// [`close()`] the remote endpoint may drop any data it received but is as yet
497 /// undelivered to the application, including data that was acknowledged as received to
498 /// the local endpoint.
499 ///
500 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
501 /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
502 /// [`close()`]: Connection::close
503 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
504 let conn = &mut *self.0.state.lock("close");
505 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
506 }
507
508 /// Transmit `data` as an unreliable, unordered application datagram
509 ///
510 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
511 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
512 /// dictated by the peer.
513 ///
514 /// Previously queued datagrams which are still unsent may be discarded to make space for this
515 /// datagram, in order of oldest to newest.
516 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
517 let conn = &mut *self.0.state.lock("send_datagram");
518 if let Some(ref x) = conn.error {
519 return Err(SendDatagramError::ConnectionLost(x.clone()));
520 }
521 use crate::SendDatagramError::*;
522 match conn.inner.datagrams().send(data, true) {
523 Ok(()) => {
524 conn.wake();
525 Ok(())
526 }
527 Err(e) => Err(match e {
528 Blocked(..) => unreachable!(),
529 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
530 Disabled => SendDatagramError::Disabled,
531 TooLarge => SendDatagramError::TooLarge,
532 }),
533 }
534 }
535
536 /// Transmit `data` as an unreliable, unordered application datagram
537 ///
538 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
539 /// conditions, which effectively prioritizes old datagrams over new datagrams.
540 ///
541 /// See [`send_datagram()`] for details.
542 ///
543 /// [`send_datagram()`]: Connection::send_datagram
544 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
545 SendDatagram {
546 conn: &self.0,
547 data: Some(data),
548 notify: self.0.shared.datagrams_unblocked.notified(),
549 }
550 }
551
552 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
553 ///
554 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
555 ///
556 /// This may change over the lifetime of a connection according to variation in the path MTU
557 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
558 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
559 ///
560 /// Not necessarily the maximum size of received datagrams.
561 ///
562 /// [`send_datagram()`]: Connection::send_datagram
563 pub fn max_datagram_size(&self) -> Option<usize> {
564 self.0
565 .state
566 .lock("max_datagram_size")
567 .inner
568 .datagrams()
569 .max_size()
570 }
571
572 /// Bytes available in the outgoing datagram buffer
573 ///
574 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
575 /// at most this size is guaranteed not to cause older datagrams to be dropped.
576 pub fn datagram_send_buffer_space(&self) -> usize {
577 self.0
578 .state
579 .lock("datagram_send_buffer_space")
580 .inner
581 .datagrams()
582 .send_buffer_space()
583 }
584
585 /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
586 pub fn send_nat_address_advertisement(
587 &self,
588 address: SocketAddr,
589 priority: u32,
590 ) -> Result<u64, crate::ConnectionError> {
591 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
592 conn.inner.send_nat_address_advertisement(address, priority)
593 }
594
595 /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
596 pub fn send_nat_punch_coordination(
597 &self,
598 paired_with_sequence_number: u64,
599 address: SocketAddr,
600 round: u32,
601 ) -> Result<(), crate::ConnectionError> {
602 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
603 conn.inner
604 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
605 }
606
607 /// The side of the connection (client or server)
608 pub fn side(&self) -> Side {
609 self.0.state.lock("side").inner.side()
610 }
611
612 /// The peer's UDP address
613 ///
614 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
615 /// switching to a cellular internet connection.
616 pub fn remote_address(&self) -> SocketAddr {
617 self.0.state.lock("remote_address").inner.remote_address()
618 }
619
620 /// The external/reflexive address observed by the remote peer
621 ///
622 /// Returns the address that the remote peer has observed for this connection,
623 /// as reported via OBSERVED_ADDRESS frames. This is useful for NAT traversal
624 /// to learn the public address of this endpoint as seen by others.
625 ///
626 /// Returns `None` if:
627 /// - Address discovery is not enabled
628 /// - No OBSERVED_ADDRESS frame has been received yet
629 /// - The connection hasn't completed the handshake
630 pub fn observed_address(&self) -> Option<SocketAddr> {
631 self.0
632 .state
633 .lock("observed_address")
634 .inner
635 .observed_address()
636 }
637
638 /// The local IP address which was used when the peer established
639 /// the connection
640 ///
641 /// This can be different from the address the endpoint is bound to, in case
642 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
643 ///
644 /// This will return `None` for clients, or when the platform does not expose this
645 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
646 /// supported platforms when using quinn_udp for I/O, which is the default.
647 pub fn local_ip(&self) -> Option<IpAddr> {
648 self.0.state.lock("local_ip").inner.local_ip()
649 }
650
651 /// Current best estimate of this connection's latency (round-trip-time)
652 pub fn rtt(&self) -> Duration {
653 self.0.state.lock("rtt").inner.rtt()
654 }
655
656 /// Returns connection statistics
657 pub fn stats(&self) -> ConnectionStats {
658 self.0.state.lock("stats").inner.stats()
659 }
660
661 /// Current state of the congestion control algorithm, for debugging purposes
662 pub fn congestion_state(&self) -> Box<dyn Controller> {
663 self.0
664 .state
665 .lock("congestion_state")
666 .inner
667 .congestion_state()
668 .clone_box()
669 }
670
671 /// Parameters negotiated during the handshake
672 ///
673 /// Guaranteed to return `Some` on fully established connections or after
674 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
675 /// the returned value.
676 ///
677 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
678 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
679 self.0
680 .state
681 .lock("handshake_data")
682 .inner
683 .crypto_session()
684 .handshake_data()
685 }
686
687 /// Cryptographic identity of the peer
688 ///
689 /// The dynamic type returned is determined by the configured
690 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
691 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
692 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
693 self.0
694 .state
695 .lock("peer_identity")
696 .inner
697 .crypto_session()
698 .peer_identity()
699 }
700
701 /// A stable identifier for this connection
702 ///
703 /// Peer addresses and connection IDs can change, but this value will remain
704 /// fixed for the lifetime of the connection.
705 pub fn stable_id(&self) -> usize {
706 self.0.stable_id()
707 }
708
709 /// Returns true if this connection negotiated post-quantum settings.
710 ///
711 /// This reflects either explicit PQC algorithms advertised via transport
712 /// parameters or in-band detection from handshake CRYPTO frames.
713 pub fn is_pqc(&self) -> bool {
714 let state = self.0.state.lock("is_pqc");
715 state.inner.is_pqc()
716 }
717
718 /// Debug-only hint: returns true when the underlying TLS provider was
719 /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
720 /// for tests and does not itself guarantee group enforcement.
721 pub fn debug_kem_only(&self) -> bool {
722 crate::crypto::rustls::debug_kem_only_enabled()
723 }
724
725 /// Update traffic keys spontaneously
726 ///
727 /// This primarily exists for testing purposes.
728 pub fn force_key_update(&self) {
729 self.0
730 .state
731 .lock("force_key_update")
732 .inner
733 .force_key_update()
734 }
735
736 /// Derive keying material from this connection's TLS session secrets.
737 ///
738 /// When both peers call this method with the same `label` and `context`
739 /// arguments and `output` buffers of equal length, they will get the
740 /// same sequence of bytes in `output`. These bytes are cryptographically
741 /// strong and pseudorandom, and are suitable for use as keying material.
742 ///
743 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
744 pub fn export_keying_material(
745 &self,
746 output: &mut [u8],
747 label: &[u8],
748 context: &[u8],
749 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
750 self.0
751 .state
752 .lock("export_keying_material")
753 .inner
754 .crypto_session()
755 .export_keying_material(output, label, context)
756 }
757
758 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
759 ///
760 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
761 /// `count`s increase both minimum and worst-case memory consumption.
762 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
763 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
764 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
765 // May need to send MAX_STREAMS to make progress
766 conn.wake();
767 }
768
769 /// See [`crate::TransportConfig::receive_window()`]
770 pub fn set_receive_window(&self, receive_window: VarInt) {
771 let mut conn = self.0.state.lock("set_receive_window");
772 conn.inner.set_receive_window(receive_window);
773 conn.wake();
774 }
775
776 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
777 ///
778 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
779 /// `count`s increase both minimum and worst-case memory consumption.
780 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
781 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
782 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
783 // May need to send MAX_STREAMS to make progress
784 conn.wake();
785 }
786
787 /// Set up qlog for this connection.
788 #[cfg(feature = "__qlog")]
789 pub fn set_qlog(
790 &mut self,
791 writer: Box<dyn std::io::Write + Send + Sync>,
792 title: Option<String>,
793 description: Option<String>,
794 ) {
795 let mut state = self.0.state.lock("__qlog");
796 state
797 .inner
798 .set_qlog(writer, title, description, Instant::now());
799 }
800}
801
802pin_project! {
803 /// Future produced by [`Connection::open_uni`]
804 pub struct OpenUni<'a> {
805 conn: &'a ConnectionRef,
806 #[pin]
807 notify: Notified<'a>,
808 }
809}
810
811impl Future for OpenUni<'_> {
812 type Output = Result<SendStream, ConnectionError>;
813 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
814 let this = self.project();
815 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
816 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
817 }
818}
819
820pin_project! {
821 /// Future produced by [`Connection::open_bi`]
822 pub struct OpenBi<'a> {
823 conn: &'a ConnectionRef,
824 #[pin]
825 notify: Notified<'a>,
826 }
827}
828
829impl Future for OpenBi<'_> {
830 type Output = Result<(SendStream, RecvStream), ConnectionError>;
831 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
832 let this = self.project();
833 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
834
835 Poll::Ready(Ok((
836 SendStream::new(conn.clone(), id, is_0rtt),
837 RecvStream::new(conn, id, is_0rtt),
838 )))
839 }
840}
841
842fn poll_open<'a>(
843 ctx: &mut Context<'_>,
844 conn: &'a ConnectionRef,
845 mut notify: Pin<&mut Notified<'a>>,
846 dir: Dir,
847) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
848 let mut state = conn.state.lock("poll_open");
849 if let Some(ref e) = state.error {
850 return Poll::Ready(Err(e.clone()));
851 } else if let Some(id) = state.inner.streams().open(dir) {
852 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
853 drop(state); // Release the lock so clone can take it
854 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
855 }
856 loop {
857 match notify.as_mut().poll(ctx) {
858 // `state` lock ensures we didn't race with readiness
859 Poll::Pending => return Poll::Pending,
860 // Spurious wakeup, get a new future
861 Poll::Ready(()) => {
862 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
863 }
864 }
865 }
866}
867
868pin_project! {
869 /// Future produced by [`Connection::accept_uni`]
870 pub struct AcceptUni<'a> {
871 conn: &'a ConnectionRef,
872 #[pin]
873 notify: Notified<'a>,
874 }
875}
876
877impl Future for AcceptUni<'_> {
878 type Output = Result<RecvStream, ConnectionError>;
879
880 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
881 let this = self.project();
882 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
883 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
884 }
885}
886
887pin_project! {
888 /// Future produced by [`Connection::accept_bi`]
889 pub struct AcceptBi<'a> {
890 conn: &'a ConnectionRef,
891 #[pin]
892 notify: Notified<'a>,
893 }
894}
895
896impl Future for AcceptBi<'_> {
897 type Output = Result<(SendStream, RecvStream), ConnectionError>;
898
899 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
900 let this = self.project();
901 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
902 Poll::Ready(Ok((
903 SendStream::new(conn.clone(), id, is_0rtt),
904 RecvStream::new(conn, id, is_0rtt),
905 )))
906 }
907}
908
909fn poll_accept<'a>(
910 ctx: &mut Context<'_>,
911 conn: &'a ConnectionRef,
912 mut notify: Pin<&mut Notified<'a>>,
913 dir: Dir,
914) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
915 let mut state = conn.state.lock("poll_accept");
916 // Check for incoming streams before checking `state.error` so that already-received streams,
917 // which are necessarily finite, can be drained from a closed connection.
918 if let Some(id) = state.inner.streams().accept(dir) {
919 let is_0rtt = state.inner.is_handshaking();
920 state.wake(); // To send additional stream ID credit
921 drop(state); // Release the lock so clone can take it
922 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
923 } else if let Some(ref e) = state.error {
924 return Poll::Ready(Err(e.clone()));
925 }
926 loop {
927 match notify.as_mut().poll(ctx) {
928 // `state` lock ensures we didn't race with readiness
929 Poll::Pending => return Poll::Pending,
930 // Spurious wakeup, get a new future
931 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
932 }
933 }
934}
935
936pin_project! {
937 /// Future produced by [`Connection::read_datagram`]
938 pub struct ReadDatagram<'a> {
939 conn: &'a ConnectionRef,
940 #[pin]
941 notify: Notified<'a>,
942 }
943}
944
945impl Future for ReadDatagram<'_> {
946 type Output = Result<Bytes, ConnectionError>;
947 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
948 let mut this = self.project();
949 let mut state = this.conn.state.lock("ReadDatagram::poll");
950 // Check for buffered datagrams before checking `state.error` so that already-received
951 // datagrams, which are necessarily finite, can be drained from a closed connection.
952 match state.inner.datagrams().recv() {
953 Some(x) => {
954 return Poll::Ready(Ok(x));
955 }
956 _ => {
957 if let Some(ref e) = state.error {
958 return Poll::Ready(Err(e.clone()));
959 }
960 }
961 }
962 loop {
963 match this.notify.as_mut().poll(ctx) {
964 // `state` lock ensures we didn't race with readiness
965 Poll::Pending => return Poll::Pending,
966 // Spurious wakeup, get a new future
967 Poll::Ready(()) => this
968 .notify
969 .set(this.conn.shared.datagram_received.notified()),
970 }
971 }
972 }
973}
974
975pin_project! {
976 /// Future produced by [`Connection::send_datagram_wait`]
977 pub struct SendDatagram<'a> {
978 conn: &'a ConnectionRef,
979 data: Option<Bytes>,
980 #[pin]
981 notify: Notified<'a>,
982 }
983}
984
985impl Future for SendDatagram<'_> {
986 type Output = Result<(), SendDatagramError>;
987 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
988 let mut this = self.project();
989 let mut state = this.conn.state.lock("SendDatagram::poll");
990 if let Some(ref e) = state.error {
991 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
992 }
993 use crate::SendDatagramError::*;
994 match state.inner.datagrams().send(
995 this.data.take().ok_or_else(|| {
996 error!("SendDatagram future polled without data");
997 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
998 })?,
999 false,
1000 ) {
1001 Ok(()) => {
1002 state.wake();
1003 Poll::Ready(Ok(()))
1004 }
1005 Err(e) => Poll::Ready(Err(match e {
1006 Blocked(data) => {
1007 this.data.replace(data);
1008 loop {
1009 match this.notify.as_mut().poll(ctx) {
1010 Poll::Pending => return Poll::Pending,
1011 // Spurious wakeup, get a new future
1012 Poll::Ready(()) => this
1013 .notify
1014 .set(this.conn.shared.datagrams_unblocked.notified()),
1015 }
1016 }
1017 }
1018 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1019 Disabled => SendDatagramError::Disabled,
1020 TooLarge => SendDatagramError::TooLarge,
1021 })),
1022 }
1023 }
1024}
1025
1026#[derive(Debug)]
1027pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1028
1029impl ConnectionRef {
1030 #[allow(clippy::too_many_arguments)]
1031 fn new(
1032 handle: ConnectionHandle,
1033 conn: crate::Connection,
1034 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1035 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1036 on_handshake_data: oneshot::Sender<()>,
1037 on_connected: oneshot::Sender<bool>,
1038 socket: Arc<dyn AsyncUdpSocket>,
1039 runtime: Arc<dyn Runtime>,
1040 ) -> Self {
1041 Self(Arc::new(ConnectionInner {
1042 state: Mutex::new(State {
1043 inner: conn,
1044 driver: None,
1045 handle,
1046 on_handshake_data: Some(on_handshake_data),
1047 on_connected: Some(on_connected),
1048 connected: false,
1049 timer: None,
1050 timer_deadline: None,
1051 conn_events,
1052 endpoint_events,
1053 blocked_writers: FxHashMap::default(),
1054 blocked_readers: FxHashMap::default(),
1055 stopped: FxHashMap::default(),
1056 error: None,
1057 ref_count: 0,
1058 io_poller: socket.clone().create_io_poller(),
1059 socket,
1060 runtime,
1061 send_buffer: Vec::new(),
1062 buffered_transmit: None,
1063 binding_started: false,
1064 }),
1065 shared: Shared::default(),
1066 }))
1067 }
1068
1069 fn stable_id(&self) -> usize {
1070 &*self.0 as *const _ as usize
1071 }
1072}
1073
1074impl Clone for ConnectionRef {
1075 fn clone(&self) -> Self {
1076 self.state.lock("clone").ref_count += 1;
1077 Self(self.0.clone())
1078 }
1079}
1080
1081impl Drop for ConnectionRef {
1082 fn drop(&mut self) {
1083 let conn = &mut *self.state.lock("drop");
1084 if let Some(x) = conn.ref_count.checked_sub(1) {
1085 conn.ref_count = x;
1086 if x == 0 && !conn.inner.is_closed() {
1087 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1088 // not, we can't do any harm. If there were any streams being opened, then either
1089 // the connection will be closed for an unrelated reason or a fresh reference will
1090 // be constructed for the newly opened stream.
1091 conn.implicit_close(&self.shared);
1092 }
1093 }
1094 }
1095}
1096
1097impl std::ops::Deref for ConnectionRef {
1098 type Target = ConnectionInner;
1099 fn deref(&self) -> &Self::Target {
1100 &self.0
1101 }
1102}
1103
1104#[derive(Debug)]
1105pub(crate) struct ConnectionInner {
1106 pub(crate) state: Mutex<State>,
1107 pub(crate) shared: Shared,
1108}
1109
1110#[derive(Debug, Default)]
1111pub(crate) struct Shared {
1112 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1113 /// control budget
1114 stream_budget_available: [Notify; 2],
1115 /// Notified when the peer has initiated a new stream
1116 stream_incoming: [Notify; 2],
1117 datagram_received: Notify,
1118 datagrams_unblocked: Notify,
1119 closed: Notify,
1120}
1121
1122pub(crate) struct State {
1123 pub(crate) inner: crate::Connection,
1124 driver: Option<Waker>,
1125 handle: ConnectionHandle,
1126 on_handshake_data: Option<oneshot::Sender<()>>,
1127 on_connected: Option<oneshot::Sender<bool>>,
1128 connected: bool,
1129 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1130 timer_deadline: Option<Instant>,
1131 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1132 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1133 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1134 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1135 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1136 /// Always set to Some before the connection becomes drained
1137 pub(crate) error: Option<ConnectionError>,
1138 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1139 ref_count: usize,
1140 socket: Arc<dyn AsyncUdpSocket>,
1141 io_poller: Pin<Box<dyn UdpPoller>>,
1142 runtime: Arc<dyn Runtime>,
1143 send_buffer: Vec<u8>,
1144 /// We buffer a transmit when the underlying I/O would block
1145 buffered_transmit: Option<crate::Transmit>,
1146 /// True once we've initiated automatic channel binding (if enabled)
1147 binding_started: bool,
1148}
1149
1150impl State {
1151 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1152 let now = self.runtime.now();
1153 let mut transmits = 0;
1154
1155 let max_datagrams = self
1156 .socket
1157 .max_transmit_segments()
1158 .min(MAX_TRANSMIT_SEGMENTS);
1159
1160 loop {
1161 // Retry the last transmit, or get a new one.
1162 let t = match self.buffered_transmit.take() {
1163 Some(t) => t,
1164 None => {
1165 self.send_buffer.clear();
1166 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1167 match self
1168 .inner
1169 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1170 {
1171 Some(t) => {
1172 transmits += match t.segment_size {
1173 None => 1,
1174 Some(s) => t.size.div_ceil(s), // round up
1175 };
1176 t
1177 }
1178 None => break,
1179 }
1180 }
1181 };
1182
1183 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1184 // Retry after a future wakeup
1185 self.buffered_transmit = Some(t);
1186 return Ok(false);
1187 }
1188
1189 let len = t.size;
1190 let retry = match self
1191 .socket
1192 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1193 {
1194 Ok(()) => false,
1195 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1196 Err(e) => return Err(e),
1197 };
1198 if retry {
1199 // We thought the socket was writable, but it wasn't. Retry so that either another
1200 // `poll_writable` call determines that the socket is indeed not writable and
1201 // registers us for a wakeup, or the send succeeds if this really was just a
1202 // transient failure.
1203 self.buffered_transmit = Some(t);
1204 continue;
1205 }
1206
1207 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1208 // TODO: What isn't ideal here yet is that if we don't poll all
1209 // datagrams that could be sent we don't go into the `app_limited`
1210 // state and CWND continues to grow until we get here the next time.
1211 // See https://github.com/quinn-rs/quinn/issues/1126
1212 return Ok(true);
1213 }
1214 }
1215
1216 Ok(false)
1217 }
1218
1219 fn forward_endpoint_events(&mut self) {
1220 while let Some(event) = self.inner.poll_endpoint_events() {
1221 // If the endpoint driver is gone, noop.
1222 let _ = self.endpoint_events.send((self.handle, event));
1223 }
1224 }
1225
1226 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1227 fn process_conn_events(
1228 &mut self,
1229 shared: &Shared,
1230 cx: &mut Context,
1231 ) -> Result<(), ConnectionError> {
1232 loop {
1233 match self.conn_events.poll_recv(cx) {
1234 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1235 self.socket = socket;
1236 self.io_poller = self.socket.clone().create_io_poller();
1237 self.inner.local_address_changed();
1238 }
1239 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1240 self.inner.handle_event(event);
1241 }
1242 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1243 self.close(error_code, reason, shared);
1244 }
1245 Poll::Ready(None) => {
1246 return Err(ConnectionError::TransportError(crate::TransportError {
1247 code: crate::TransportErrorCode::INTERNAL_ERROR,
1248 frame: None,
1249 reason: "endpoint driver future was dropped".to_string(),
1250 }));
1251 }
1252 Poll::Pending => {
1253 return Ok(());
1254 }
1255 }
1256 }
1257 }
1258
1259 fn forward_app_events(&mut self, shared: &Shared) {
1260 while let Some(event) = self.inner.poll() {
1261 use crate::Event::*;
1262 match event {
1263 HandshakeDataReady => {
1264 if let Some(x) = self.on_handshake_data.take() {
1265 let _ = x.send(());
1266 }
1267 }
1268 Connected => {
1269 self.connected = true;
1270 if let Some(x) = self.on_connected.take() {
1271 // We don't care if the on-connected future was dropped
1272 let _ = x.send(self.inner.accepted_0rtt());
1273 }
1274 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1275 // Wake up rejected 0-RTT streams so they can fail immediately with
1276 // `ZeroRttRejected` errors.
1277 wake_all(&mut self.blocked_writers);
1278 wake_all(&mut self.blocked_readers);
1279 wake_all_notify(&mut self.stopped);
1280 }
1281 }
1282 ConnectionLost { reason } => {
1283 self.terminate(reason, shared);
1284 }
1285 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1286 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1287 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1288 }
1289 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1290 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1291 }
1292 DatagramReceived => {
1293 shared.datagram_received.notify_waiters();
1294 }
1295 DatagramsUnblocked => {
1296 shared.datagrams_unblocked.notify_waiters();
1297 }
1298 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1299 Stream(StreamEvent::Available { dir }) => {
1300 // Might mean any number of streams are ready, so we wake up everyone
1301 shared.stream_budget_available[dir as usize].notify_waiters();
1302 }
1303 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1304 Stream(StreamEvent::Stopped { id, .. }) => {
1305 wake_stream_notify(id, &mut self.stopped);
1306 wake_stream(id, &mut self.blocked_writers);
1307 }
1308 }
1309 }
1310 }
1311
1312 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1313 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1314 // timer is registered with the runtime (and check whether it's already
1315 // expired).
1316 match self.inner.poll_timeout() {
1317 Some(deadline) => {
1318 if let Some(delay) = &mut self.timer {
1319 // There is no need to reset the tokio timer if the deadline
1320 // did not change
1321 if self
1322 .timer_deadline
1323 .map(|current_deadline| current_deadline != deadline)
1324 .unwrap_or(true)
1325 {
1326 delay.as_mut().reset(deadline);
1327 }
1328 } else {
1329 self.timer = Some(self.runtime.new_timer(deadline));
1330 }
1331 // Store the actual expiration time of the timer
1332 self.timer_deadline = Some(deadline);
1333 }
1334 None => {
1335 self.timer_deadline = None;
1336 return false;
1337 }
1338 }
1339
1340 if self.timer_deadline.is_none() {
1341 return false;
1342 }
1343
1344 let delay = match self.timer.as_mut() {
1345 Some(timer) => timer.as_mut(),
1346 None => {
1347 error!("Timer missing in state where it should exist");
1348 return false;
1349 }
1350 };
1351 if delay.poll(cx).is_pending() {
1352 // Since there wasn't a timeout event, there is nothing new
1353 // for the connection to do
1354 return false;
1355 }
1356
1357 // A timer expired, so the caller needs to check for
1358 // new transmits, which might cause new timers to be set.
1359 self.inner.handle_timeout(self.runtime.now());
1360 self.timer_deadline = None;
1361 true
1362 }
1363
1364 /// Wake up a blocked `Driver` task to process I/O
1365 pub(crate) fn wake(&mut self) {
1366 if let Some(x) = self.driver.take() {
1367 x.wake();
1368 }
1369 }
1370
1371 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1372 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1373 self.error = Some(reason.clone());
1374 if let Some(x) = self.on_handshake_data.take() {
1375 let _ = x.send(());
1376 }
1377 wake_all(&mut self.blocked_writers);
1378 wake_all(&mut self.blocked_readers);
1379 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1380 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1381 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1382 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1383 shared.datagram_received.notify_waiters();
1384 shared.datagrams_unblocked.notify_waiters();
1385 if let Some(x) = self.on_connected.take() {
1386 let _ = x.send(false);
1387 }
1388 wake_all_notify(&mut self.stopped);
1389 shared.closed.notify_waiters();
1390 }
1391
1392 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1393 self.inner.close(self.runtime.now(), error_code, reason);
1394 self.terminate(ConnectionError::LocallyClosed, shared);
1395 self.wake();
1396 }
1397
1398 /// Close for a reason other than the application's explicit request
1399 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1400 self.close(0u32.into(), Bytes::new(), shared);
1401 }
1402
1403 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1404 if self.inner.is_handshaking()
1405 || self.inner.accepted_0rtt()
1406 || self.inner.side().is_server()
1407 {
1408 Ok(())
1409 } else {
1410 Err(())
1411 }
1412 }
1413}
1414
1415impl Drop for State {
1416 fn drop(&mut self) {
1417 if !self.inner.is_drained() {
1418 // Ensure the endpoint can tidy up
1419 let _ = self
1420 .endpoint_events
1421 .send((self.handle, crate::EndpointEvent::drained()));
1422 }
1423 }
1424}
1425
1426impl fmt::Debug for State {
1427 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1428 f.debug_struct("State").field("inner", &self.inner).finish()
1429 }
1430}
1431
1432fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1433 if let Some(waker) = wakers.remove(&stream_id) {
1434 waker.wake();
1435 }
1436}
1437
1438fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1439 wakers.drain().for_each(|(_, waker)| waker.wake())
1440}
1441
1442fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1443 if let Some(notify) = wakers.remove(&stream_id) {
1444 notify.notify_waiters()
1445 }
1446}
1447
1448fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1449 wakers
1450 .drain()
1451 .for_each(|(_, notify)| notify.notify_waiters())
1452}
1453
1454/// Errors that can arise when sending a datagram
1455#[derive(Debug, Error, Clone, Eq, PartialEq)]
1456pub enum SendDatagramError {
1457 /// The peer does not support receiving datagram frames
1458 #[error("datagrams not supported by peer")]
1459 UnsupportedByPeer,
1460 /// Datagram support is disabled locally
1461 #[error("datagram support disabled")]
1462 Disabled,
1463 /// The datagram is larger than the connection can currently accommodate
1464 ///
1465 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1466 /// exceeded.
1467 #[error("datagram too large")]
1468 TooLarge,
1469 /// The connection was lost
1470 #[error("connection lost")]
1471 ConnectionLost(#[from] ConnectionError),
1472}
1473
1474/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1475///
1476/// This limits the amount of CPU resources consumed by datagram generation,
1477/// and allows other tasks (like receiving ACKs) to run in between.
1478const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1479
1480/// The maximum amount of datagrams that are sent in a single transmit
1481///
1482/// This can be lower than the maximum platform capabilities, to avoid excessive
1483/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1484/// that numbers around 10 are a good compromise.
1485const MAX_TRANSMIT_SEGMENTS: usize = 10;