ant_quic/high_level/connection.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9 any::Any,
10 fmt,
11 future::Future,
12 io,
13 net::{IpAddr, SocketAddr},
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27 ConnectionEvent,
28 mutex::Mutex,
29 recv_stream::RecvStream,
30 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31 send_stream::SendStream,
32 udp_transmit,
33};
34use crate::{
35 ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36 Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42 conn: Option<ConnectionRef>,
43 connected: oneshot::Receiver<bool>,
44 handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48 pub(crate) fn new(
49 handle: ConnectionHandle,
50 conn: crate::Connection,
51 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53 socket: Arc<dyn AsyncUdpSocket>,
54 runtime: Arc<dyn Runtime>,
55 ) -> Self {
56 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57 let (on_connected_send, on_connected_recv) = oneshot::channel();
58 let conn = ConnectionRef::new(
59 handle,
60 conn,
61 endpoint_events,
62 conn_events,
63 on_handshake_data_send,
64 on_connected_send,
65 socket,
66 runtime.clone(),
67 );
68
69 let driver = ConnectionDriver(conn.clone());
70 runtime.spawn(Box::pin(
71 async {
72 if let Err(e) = driver.await {
73 tracing::error!("I/O error: {e}");
74 }
75 }
76 .instrument(Span::current()),
77 ));
78
79 Self {
80 conn: Some(conn),
81 connected: on_connected_recv,
82 handshake_data_ready: Some(on_handshake_data_recv),
83 }
84 }
85
86 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87 ///
88 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89 /// If so, the returned [`Connection`] can be used to send application data without waiting for
90 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92 /// complete, at which point subsequently opened streams and written data will have full
93 /// cryptographic protection.
94 ///
95 /// ## Outgoing
96 ///
97 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103 /// If it was rejected, the existence of streams opened and other application data sent prior
104 /// to the handshake completing will not be conveyed to the remote application, and local
105 /// operations on them will return `ZeroRttRejected` errors.
106 ///
107 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108 /// relevant resumption state to be stored in the server, which servers may limit or lose for
109 /// various reasons including not persisting resumption state across server restarts.
110 ///
111 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112 /// implementation's docs for 0-RTT pitfalls.
113 ///
114 /// ## Incoming
115 ///
116 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118 ///
119 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120 /// implementation's docs for 0-RTT pitfalls.
121 ///
122 /// ## Security
123 ///
124 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125 /// replay attacks, and should therefore never invoke non-idempotent operations.
126 ///
127 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128 /// before TLS client authentication has occurred, and should therefore not be used to send
129 /// data for which client authentication is being used.
130 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132 // have to release it explicitly before returning `self` by value.
133 let conn = match self.conn.as_mut() {
134 Some(conn) => conn.state.lock("into_0rtt"),
135 None => {
136 return Err(self);
137 }
138 };
139
140 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141 drop(conn);
142
143 if is_ok {
144 match self.conn.take() {
145 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146 None => {
147 tracing::error!("Connection state missing during 0-RTT acceptance");
148 Err(self)
149 }
150 }
151 } else {
152 Err(self)
153 }
154 }
155
156 /// Parameters negotiated during the handshake
157 ///
158 /// The dynamic type returned is determined by the configured
159 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160 /// be [`downcast`](Box::downcast) to a
161 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165 // simple.
166 if let Some(x) = self.handshake_data_ready.take() {
167 let _ = x.await;
168 }
169 let conn = self.conn.as_ref().ok_or_else(|| {
170 tracing::error!("Connection state missing while retrieving handshake data");
171 ConnectionError::LocallyClosed
172 })?;
173 let inner = conn.state.lock("handshake");
174 inner
175 .inner
176 .crypto_session()
177 .handshake_data()
178 .ok_or_else(|| {
179 inner.error.clone().unwrap_or_else(|| {
180 error!("Spurious handshake data ready notification with no error");
181 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182 "Spurious handshake notification".to_string(),
183 ))
184 })
185 })
186 }
187
188 /// The local IP address which was used when the peer established
189 /// the connection
190 ///
191 /// This can be different from the address the endpoint is bound to, in case
192 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193 ///
194 /// This will return `None` for clients, or when the platform does not expose this
195 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196 /// supported platforms when using quinn_udp for I/O, which is the default.
197 ///
198 /// Will panic if called after `poll` has returned `Ready`.
199 pub fn local_ip(&self) -> Option<IpAddr> {
200 let conn = self.conn.as_ref()?;
201 let inner = conn.state.lock("local_ip");
202
203 inner.inner.local_ip()
204 }
205
206 /// The peer's UDP address
207 ///
208 /// Returns an error if called after `poll` has returned `Ready`.
209 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211 error!("Connection used after yielding Ready");
212 ConnectionError::LocallyClosed
213 })?;
214 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215 }
216}
217
218impl Future for Connecting {
219 type Output = Result<Connection, ConnectionError>;
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221 Pin::new(&mut self.connected).poll(cx).map(|_| {
222 let conn = self.conn.take().ok_or_else(|| {
223 error!("Connection not available when connecting future resolves");
224 ConnectionError::LocallyClosed
225 })?;
226 let inner = conn.state.lock("connecting");
227 if inner.connected {
228 drop(inner);
229 Ok(Connection(conn))
230 } else {
231 Err(inner.error.clone().unwrap_or_else(|| {
232 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233 "connection failed without error".to_string(),
234 ))
235 }))
236 }
237 })
238 }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248 type Output = bool;
249 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251 }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269 type Output = Result<(), io::Error>;
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272 let conn = &mut *self.0.state.lock("poll");
273
274 let span = debug_span!("drive", id = conn.handle.0);
275 let _guard = span.enter();
276
277 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278 conn.terminate(e, &self.0.shared);
279 return Poll::Ready(Ok(()));
280 }
281 let mut keep_going = conn.drive_transmit(cx)?;
282 // If a timer expires, there might be more to transmit. When we transmit something, we
283 // might need to reset a timer. Hence, we must loop until neither happens.
284 keep_going |= conn.drive_timer(cx);
285 conn.forward_endpoint_events();
286 conn.forward_app_events(&self.0.shared);
287
288 // Kick off automatic channel binding once connected, if configured
289 if conn.connected && !conn.binding_started {
290 if let Some(rt) = crate::trust::global_runtime() {
291 // Delay NEW_TOKEN until binding completes
292 conn.inner.set_delay_new_token_until_binding(true);
293
294 let hl_conn_server = Connection(self.0.clone());
295 let hl_conn_client = hl_conn_server.clone();
296 let store = rt.store.clone();
297 let policy = rt.policy.clone();
298 let signer = rt.local_signing_key.clone();
299 let spki = rt.local_spki.clone();
300 let runtime = conn.runtime.clone();
301
302 if conn.inner.side().is_server() {
303 runtime.spawn(Box::pin(async move {
304 match crate::trust::recv_verify_binding_ed25519(
305 &hl_conn_server,
306 &*store,
307 &policy,
308 )
309 .await
310 {
311 Ok(peer) => {
312 hl_conn_server
313 .0
314 .state
315 .lock("set peer")
316 .inner
317 .set_token_binding_peer_id(peer);
318 hl_conn_server
319 .0
320 .state
321 .lock("allow tokens")
322 .inner
323 .set_delay_new_token_until_binding(false);
324 }
325 Err(_e) => {
326 hl_conn_server.close(0u32.into(), b"channel binding failed");
327 }
328 }
329 }));
330 }
331
332 if conn.inner.side().is_client() {
333 runtime.spawn(Box::pin(async move {
334 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
335 let _ = crate::trust::send_binding_ed25519(
336 &hl_conn_client,
337 &exp,
338 &signer,
339 &spki,
340 )
341 .await;
342 }
343 }));
344 }
345
346 conn.binding_started = true;
347 }
348 }
349
350 if !conn.inner.is_drained() {
351 if keep_going {
352 // If the connection hasn't processed all tasks, schedule it again
353 cx.waker().wake_by_ref();
354 } else {
355 conn.driver = Some(cx.waker().clone());
356 }
357 return Poll::Pending;
358 }
359 if conn.error.is_none() {
360 unreachable!("drained connections always have an error");
361 }
362 Poll::Ready(Ok(()))
363 }
364}
365
366/// A QUIC connection.
367///
368/// If all references to a connection (including every clone of the `Connection` handle, streams of
369/// incoming streams, and the various stream types) have been dropped, then the connection will be
370/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
371/// connection explicitly by calling [`Connection::close()`].
372///
373/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
374/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
375/// application. [`Connection::close()`] describes in more detail how to gracefully close a
376/// connection without losing application data.
377///
378/// May be cloned to obtain another handle to the same connection.
379///
380/// [`Connection::close()`]: Connection::close
381#[derive(Debug, Clone)]
382pub struct Connection(ConnectionRef);
383
384impl Connection {
385 /// Initiate a new outgoing unidirectional stream.
386 ///
387 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
388 /// consequence, the peer won't be notified that a stream has been opened until the stream is
389 /// actually used.
390 pub fn open_uni(&self) -> OpenUni<'_> {
391 OpenUni {
392 conn: &self.0,
393 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
394 }
395 }
396
397 /// Initiate a new outgoing bidirectional stream.
398 ///
399 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
400 /// consequence, the peer won't be notified that a stream has been opened until the stream is
401 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
402 /// anything to [`SendStream`] will never succeed.
403 ///
404 /// [`open_bi()`]: Self::open_bi
405 /// [`SendStream`]: crate::SendStream
406 /// [`RecvStream`]: crate::RecvStream
407 pub fn open_bi(&self) -> OpenBi<'_> {
408 OpenBi {
409 conn: &self.0,
410 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
411 }
412 }
413
414 /// Accept the next incoming uni-directional stream
415 pub fn accept_uni(&self) -> AcceptUni<'_> {
416 AcceptUni {
417 conn: &self.0,
418 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
419 }
420 }
421
422 /// Accept the next incoming bidirectional stream
423 ///
424 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
425 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
426 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
427 ///
428 /// [`accept_bi()`]: Self::accept_bi
429 /// [`open_bi()`]: Self::open_bi
430 /// [`SendStream`]: crate::SendStream
431 /// [`RecvStream`]: crate::RecvStream
432 pub fn accept_bi(&self) -> AcceptBi<'_> {
433 AcceptBi {
434 conn: &self.0,
435 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
436 }
437 }
438
439 /// Receive an application datagram
440 pub fn read_datagram(&self) -> ReadDatagram<'_> {
441 ReadDatagram {
442 conn: &self.0,
443 notify: self.0.shared.datagram_received.notified(),
444 }
445 }
446
447 /// Wait for the connection to be closed for any reason
448 ///
449 /// Despite the return type's name, closed connections are often not an error condition at the
450 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
451 /// and [`ConnectionError::ApplicationClosed`].
452 pub async fn closed(&self) -> ConnectionError {
453 {
454 let conn = self.0.state.lock("closed");
455 if let Some(error) = conn.error.as_ref() {
456 return error.clone();
457 }
458 // Construct the future while the lock is held to ensure we can't miss a wakeup if
459 // the `Notify` is signaled immediately after we release the lock. `await` it after
460 // the lock guard is out of scope.
461 self.0.shared.closed.notified()
462 }
463 .await;
464 self.0
465 .state
466 .lock("closed")
467 .error
468 .as_ref()
469 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
470 .clone()
471 }
472
473 /// If the connection is closed, the reason why.
474 ///
475 /// Returns `None` if the connection is still open.
476 pub fn close_reason(&self) -> Option<ConnectionError> {
477 self.0.state.lock("close_reason").error.clone()
478 }
479
480 /// Close the connection immediately.
481 ///
482 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
483 /// more data is sent to the peer and the peer may drop buffered data upon receiving
484 /// the CONNECTION_CLOSE frame.
485 ///
486 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
487 ///
488 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
489 /// is preserved in full, it should be kept under 1KiB.
490 ///
491 /// # Gracefully closing a connection
492 ///
493 /// Only the peer last receiving application data can be certain that all data is
494 /// delivered. The only reliable action it can then take is to close the connection,
495 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
496 /// frame is very likely if both endpoints stay online long enough, and
497 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
498 /// remote peer will time out the connection, provided that the idle timeout is not
499 /// disabled.
500 ///
501 /// The sending side can not guarantee all stream data is delivered to the remote
502 /// application. It only knows the data is delivered to the QUIC stack of the remote
503 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
504 /// [`close()`] the remote endpoint may drop any data it received but is as yet
505 /// undelivered to the application, including data that was acknowledged as received to
506 /// the local endpoint.
507 ///
508 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
509 /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
510 /// [`close()`]: Connection::close
511 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
512 let conn = &mut *self.0.state.lock("close");
513 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
514 }
515
516 /// Transmit `data` as an unreliable, unordered application datagram
517 ///
518 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
519 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
520 /// dictated by the peer.
521 ///
522 /// Previously queued datagrams which are still unsent may be discarded to make space for this
523 /// datagram, in order of oldest to newest.
524 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
525 let conn = &mut *self.0.state.lock("send_datagram");
526 if let Some(ref x) = conn.error {
527 return Err(SendDatagramError::ConnectionLost(x.clone()));
528 }
529 use crate::SendDatagramError::*;
530 match conn.inner.datagrams().send(data, true) {
531 Ok(()) => {
532 conn.wake();
533 Ok(())
534 }
535 Err(e) => Err(match e {
536 Blocked(..) => unreachable!(),
537 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
538 Disabled => SendDatagramError::Disabled,
539 TooLarge => SendDatagramError::TooLarge,
540 }),
541 }
542 }
543
544 /// Transmit `data` as an unreliable, unordered application datagram
545 ///
546 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
547 /// conditions, which effectively prioritizes old datagrams over new datagrams.
548 ///
549 /// See [`send_datagram()`] for details.
550 ///
551 /// [`send_datagram()`]: Connection::send_datagram
552 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
553 SendDatagram {
554 conn: &self.0,
555 data: Some(data),
556 notify: self.0.shared.datagrams_unblocked.notified(),
557 }
558 }
559
560 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
561 ///
562 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
563 ///
564 /// This may change over the lifetime of a connection according to variation in the path MTU
565 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
566 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
567 ///
568 /// Not necessarily the maximum size of received datagrams.
569 ///
570 /// [`send_datagram()`]: Connection::send_datagram
571 pub fn max_datagram_size(&self) -> Option<usize> {
572 self.0
573 .state
574 .lock("max_datagram_size")
575 .inner
576 .datagrams()
577 .max_size()
578 }
579
580 /// Bytes available in the outgoing datagram buffer
581 ///
582 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
583 /// at most this size is guaranteed not to cause older datagrams to be dropped.
584 pub fn datagram_send_buffer_space(&self) -> usize {
585 self.0
586 .state
587 .lock("datagram_send_buffer_space")
588 .inner
589 .datagrams()
590 .send_buffer_space()
591 }
592
593 /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
594 pub fn send_nat_address_advertisement(
595 &self,
596 address: SocketAddr,
597 priority: u32,
598 ) -> Result<u64, crate::ConnectionError> {
599 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
600 conn.inner.send_nat_address_advertisement(address, priority)
601 }
602
603 /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
604 pub fn send_nat_punch_coordination(
605 &self,
606 paired_with_sequence_number: u64,
607 address: SocketAddr,
608 round: u32,
609 ) -> Result<(), crate::ConnectionError> {
610 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
611 conn.inner
612 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
613 }
614
615 /// The side of the connection (client or server)
616 pub fn side(&self) -> Side {
617 self.0.state.lock("side").inner.side()
618 }
619
620 /// The peer's UDP address
621 ///
622 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
623 /// switching to a cellular internet connection.
624 pub fn remote_address(&self) -> SocketAddr {
625 self.0.state.lock("remote_address").inner.remote_address()
626 }
627
628 /// The local IP address which was used when the peer established
629 /// the connection
630 ///
631 /// This can be different from the address the endpoint is bound to, in case
632 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
633 ///
634 /// This will return `None` for clients, or when the platform does not expose this
635 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
636 /// supported platforms when using quinn_udp for I/O, which is the default.
637 pub fn local_ip(&self) -> Option<IpAddr> {
638 self.0.state.lock("local_ip").inner.local_ip()
639 }
640
641 /// Current best estimate of this connection's latency (round-trip-time)
642 pub fn rtt(&self) -> Duration {
643 self.0.state.lock("rtt").inner.rtt()
644 }
645
646 /// Returns connection statistics
647 pub fn stats(&self) -> ConnectionStats {
648 self.0.state.lock("stats").inner.stats()
649 }
650
651 /// Current state of the congestion control algorithm, for debugging purposes
652 pub fn congestion_state(&self) -> Box<dyn Controller> {
653 self.0
654 .state
655 .lock("congestion_state")
656 .inner
657 .congestion_state()
658 .clone_box()
659 }
660
661 /// Parameters negotiated during the handshake
662 ///
663 /// Guaranteed to return `Some` on fully established connections or after
664 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
665 /// the returned value.
666 ///
667 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
668 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
669 self.0
670 .state
671 .lock("handshake_data")
672 .inner
673 .crypto_session()
674 .handshake_data()
675 }
676
677 /// Cryptographic identity of the peer
678 ///
679 /// The dynamic type returned is determined by the configured
680 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
681 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
682 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
683 self.0
684 .state
685 .lock("peer_identity")
686 .inner
687 .crypto_session()
688 .peer_identity()
689 }
690
691 /// A stable identifier for this connection
692 ///
693 /// Peer addresses and connection IDs can change, but this value will remain
694 /// fixed for the lifetime of the connection.
695 pub fn stable_id(&self) -> usize {
696 self.0.stable_id()
697 }
698
699 /// Returns true if this connection negotiated post-quantum settings.
700 ///
701 /// This reflects either explicit PQC algorithms advertised via transport
702 /// parameters or in-band detection from handshake CRYPTO frames.
703 pub fn is_pqc(&self) -> bool {
704 let state = self.0.state.lock("is_pqc");
705 state.inner.is_pqc()
706 }
707
708 /// Debug-only hint: returns true when the underlying TLS provider was
709 /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
710 /// for tests and does not itself guarantee group enforcement.
711 pub fn debug_kem_only(&self) -> bool {
712 #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
713 {
714 crate::crypto::rustls::debug_kem_only_enabled()
715 }
716 #[cfg(not(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring")))]
717 {
718 false
719 }
720 }
721
722 /// Update traffic keys spontaneously
723 ///
724 /// This primarily exists for testing purposes.
725 pub fn force_key_update(&self) {
726 self.0
727 .state
728 .lock("force_key_update")
729 .inner
730 .force_key_update()
731 }
732
733 /// Derive keying material from this connection's TLS session secrets.
734 ///
735 /// When both peers call this method with the same `label` and `context`
736 /// arguments and `output` buffers of equal length, they will get the
737 /// same sequence of bytes in `output`. These bytes are cryptographically
738 /// strong and pseudorandom, and are suitable for use as keying material.
739 ///
740 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
741 pub fn export_keying_material(
742 &self,
743 output: &mut [u8],
744 label: &[u8],
745 context: &[u8],
746 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
747 self.0
748 .state
749 .lock("export_keying_material")
750 .inner
751 .crypto_session()
752 .export_keying_material(output, label, context)
753 }
754
755 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
756 ///
757 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
758 /// `count`s increase both minimum and worst-case memory consumption.
759 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
760 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
761 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
762 // May need to send MAX_STREAMS to make progress
763 conn.wake();
764 }
765
766 /// See [`crate::TransportConfig::receive_window()`]
767 pub fn set_receive_window(&self, receive_window: VarInt) {
768 let mut conn = self.0.state.lock("set_receive_window");
769 conn.inner.set_receive_window(receive_window);
770 conn.wake();
771 }
772
773 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
774 ///
775 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
776 /// `count`s increase both minimum and worst-case memory consumption.
777 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
778 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
779 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
780 // May need to send MAX_STREAMS to make progress
781 conn.wake();
782 }
783
784 /// Set up qlog for this connection.
785 #[cfg(feature = "__qlog")]
786 pub fn set_qlog(
787 &mut self,
788 writer: Box<dyn std::io::Write + Send + Sync>,
789 title: Option<String>,
790 description: Option<String>,
791 ) {
792 let mut state = self.0.state.lock("__qlog");
793 state
794 .inner
795 .set_qlog(writer, title, description, Instant::now());
796 }
797}
798
799pin_project! {
800 /// Future produced by [`Connection::open_uni`]
801 pub struct OpenUni<'a> {
802 conn: &'a ConnectionRef,
803 #[pin]
804 notify: Notified<'a>,
805 }
806}
807
808impl Future for OpenUni<'_> {
809 type Output = Result<SendStream, ConnectionError>;
810 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
811 let this = self.project();
812 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
813 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
814 }
815}
816
817pin_project! {
818 /// Future produced by [`Connection::open_bi`]
819 pub struct OpenBi<'a> {
820 conn: &'a ConnectionRef,
821 #[pin]
822 notify: Notified<'a>,
823 }
824}
825
826impl Future for OpenBi<'_> {
827 type Output = Result<(SendStream, RecvStream), ConnectionError>;
828 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
829 let this = self.project();
830 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
831
832 Poll::Ready(Ok((
833 SendStream::new(conn.clone(), id, is_0rtt),
834 RecvStream::new(conn, id, is_0rtt),
835 )))
836 }
837}
838
839fn poll_open<'a>(
840 ctx: &mut Context<'_>,
841 conn: &'a ConnectionRef,
842 mut notify: Pin<&mut Notified<'a>>,
843 dir: Dir,
844) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
845 let mut state = conn.state.lock("poll_open");
846 if let Some(ref e) = state.error {
847 return Poll::Ready(Err(e.clone()));
848 } else if let Some(id) = state.inner.streams().open(dir) {
849 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
850 drop(state); // Release the lock so clone can take it
851 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
852 }
853 loop {
854 match notify.as_mut().poll(ctx) {
855 // `state` lock ensures we didn't race with readiness
856 Poll::Pending => return Poll::Pending,
857 // Spurious wakeup, get a new future
858 Poll::Ready(()) => {
859 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
860 }
861 }
862 }
863}
864
865pin_project! {
866 /// Future produced by [`Connection::accept_uni`]
867 pub struct AcceptUni<'a> {
868 conn: &'a ConnectionRef,
869 #[pin]
870 notify: Notified<'a>,
871 }
872}
873
874impl Future for AcceptUni<'_> {
875 type Output = Result<RecvStream, ConnectionError>;
876
877 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
878 let this = self.project();
879 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
880 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
881 }
882}
883
884pin_project! {
885 /// Future produced by [`Connection::accept_bi`]
886 pub struct AcceptBi<'a> {
887 conn: &'a ConnectionRef,
888 #[pin]
889 notify: Notified<'a>,
890 }
891}
892
893impl Future for AcceptBi<'_> {
894 type Output = Result<(SendStream, RecvStream), ConnectionError>;
895
896 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
897 let this = self.project();
898 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
899 Poll::Ready(Ok((
900 SendStream::new(conn.clone(), id, is_0rtt),
901 RecvStream::new(conn, id, is_0rtt),
902 )))
903 }
904}
905
906fn poll_accept<'a>(
907 ctx: &mut Context<'_>,
908 conn: &'a ConnectionRef,
909 mut notify: Pin<&mut Notified<'a>>,
910 dir: Dir,
911) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
912 let mut state = conn.state.lock("poll_accept");
913 // Check for incoming streams before checking `state.error` so that already-received streams,
914 // which are necessarily finite, can be drained from a closed connection.
915 if let Some(id) = state.inner.streams().accept(dir) {
916 let is_0rtt = state.inner.is_handshaking();
917 state.wake(); // To send additional stream ID credit
918 drop(state); // Release the lock so clone can take it
919 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
920 } else if let Some(ref e) = state.error {
921 return Poll::Ready(Err(e.clone()));
922 }
923 loop {
924 match notify.as_mut().poll(ctx) {
925 // `state` lock ensures we didn't race with readiness
926 Poll::Pending => return Poll::Pending,
927 // Spurious wakeup, get a new future
928 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
929 }
930 }
931}
932
933pin_project! {
934 /// Future produced by [`Connection::read_datagram`]
935 pub struct ReadDatagram<'a> {
936 conn: &'a ConnectionRef,
937 #[pin]
938 notify: Notified<'a>,
939 }
940}
941
942impl Future for ReadDatagram<'_> {
943 type Output = Result<Bytes, ConnectionError>;
944 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
945 let mut this = self.project();
946 let mut state = this.conn.state.lock("ReadDatagram::poll");
947 // Check for buffered datagrams before checking `state.error` so that already-received
948 // datagrams, which are necessarily finite, can be drained from a closed connection.
949 match state.inner.datagrams().recv() {
950 Some(x) => {
951 return Poll::Ready(Ok(x));
952 }
953 _ => {
954 if let Some(ref e) = state.error {
955 return Poll::Ready(Err(e.clone()));
956 }
957 }
958 }
959 loop {
960 match this.notify.as_mut().poll(ctx) {
961 // `state` lock ensures we didn't race with readiness
962 Poll::Pending => return Poll::Pending,
963 // Spurious wakeup, get a new future
964 Poll::Ready(()) => this
965 .notify
966 .set(this.conn.shared.datagram_received.notified()),
967 }
968 }
969 }
970}
971
972pin_project! {
973 /// Future produced by [`Connection::send_datagram_wait`]
974 pub struct SendDatagram<'a> {
975 conn: &'a ConnectionRef,
976 data: Option<Bytes>,
977 #[pin]
978 notify: Notified<'a>,
979 }
980}
981
982impl Future for SendDatagram<'_> {
983 type Output = Result<(), SendDatagramError>;
984 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
985 let mut this = self.project();
986 let mut state = this.conn.state.lock("SendDatagram::poll");
987 if let Some(ref e) = state.error {
988 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
989 }
990 use crate::SendDatagramError::*;
991 match state.inner.datagrams().send(
992 this.data.take().ok_or_else(|| {
993 error!("SendDatagram future polled without data");
994 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
995 })?,
996 false,
997 ) {
998 Ok(()) => {
999 state.wake();
1000 Poll::Ready(Ok(()))
1001 }
1002 Err(e) => Poll::Ready(Err(match e {
1003 Blocked(data) => {
1004 this.data.replace(data);
1005 loop {
1006 match this.notify.as_mut().poll(ctx) {
1007 Poll::Pending => return Poll::Pending,
1008 // Spurious wakeup, get a new future
1009 Poll::Ready(()) => this
1010 .notify
1011 .set(this.conn.shared.datagrams_unblocked.notified()),
1012 }
1013 }
1014 }
1015 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1016 Disabled => SendDatagramError::Disabled,
1017 TooLarge => SendDatagramError::TooLarge,
1018 })),
1019 }
1020 }
1021}
1022
1023#[derive(Debug)]
1024pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1025
1026impl ConnectionRef {
1027 #[allow(clippy::too_many_arguments)]
1028 fn new(
1029 handle: ConnectionHandle,
1030 conn: crate::Connection,
1031 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1032 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1033 on_handshake_data: oneshot::Sender<()>,
1034 on_connected: oneshot::Sender<bool>,
1035 socket: Arc<dyn AsyncUdpSocket>,
1036 runtime: Arc<dyn Runtime>,
1037 ) -> Self {
1038 Self(Arc::new(ConnectionInner {
1039 state: Mutex::new(State {
1040 inner: conn,
1041 driver: None,
1042 handle,
1043 on_handshake_data: Some(on_handshake_data),
1044 on_connected: Some(on_connected),
1045 connected: false,
1046 timer: None,
1047 timer_deadline: None,
1048 conn_events,
1049 endpoint_events,
1050 blocked_writers: FxHashMap::default(),
1051 blocked_readers: FxHashMap::default(),
1052 stopped: FxHashMap::default(),
1053 error: None,
1054 ref_count: 0,
1055 io_poller: socket.clone().create_io_poller(),
1056 socket,
1057 runtime,
1058 send_buffer: Vec::new(),
1059 buffered_transmit: None,
1060 binding_started: false,
1061 }),
1062 shared: Shared::default(),
1063 }))
1064 }
1065
1066 fn stable_id(&self) -> usize {
1067 &*self.0 as *const _ as usize
1068 }
1069}
1070
1071impl Clone for ConnectionRef {
1072 fn clone(&self) -> Self {
1073 self.state.lock("clone").ref_count += 1;
1074 Self(self.0.clone())
1075 }
1076}
1077
1078impl Drop for ConnectionRef {
1079 fn drop(&mut self) {
1080 let conn = &mut *self.state.lock("drop");
1081 if let Some(x) = conn.ref_count.checked_sub(1) {
1082 conn.ref_count = x;
1083 if x == 0 && !conn.inner.is_closed() {
1084 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1085 // not, we can't do any harm. If there were any streams being opened, then either
1086 // the connection will be closed for an unrelated reason or a fresh reference will
1087 // be constructed for the newly opened stream.
1088 conn.implicit_close(&self.shared);
1089 }
1090 }
1091 }
1092}
1093
1094impl std::ops::Deref for ConnectionRef {
1095 type Target = ConnectionInner;
1096 fn deref(&self) -> &Self::Target {
1097 &self.0
1098 }
1099}
1100
1101#[derive(Debug)]
1102pub(crate) struct ConnectionInner {
1103 pub(crate) state: Mutex<State>,
1104 pub(crate) shared: Shared,
1105}
1106
1107#[derive(Debug, Default)]
1108pub(crate) struct Shared {
1109 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1110 /// control budget
1111 stream_budget_available: [Notify; 2],
1112 /// Notified when the peer has initiated a new stream
1113 stream_incoming: [Notify; 2],
1114 datagram_received: Notify,
1115 datagrams_unblocked: Notify,
1116 closed: Notify,
1117}
1118
1119pub(crate) struct State {
1120 pub(crate) inner: crate::Connection,
1121 driver: Option<Waker>,
1122 handle: ConnectionHandle,
1123 on_handshake_data: Option<oneshot::Sender<()>>,
1124 on_connected: Option<oneshot::Sender<bool>>,
1125 connected: bool,
1126 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1127 timer_deadline: Option<Instant>,
1128 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1129 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1130 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1131 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1132 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1133 /// Always set to Some before the connection becomes drained
1134 pub(crate) error: Option<ConnectionError>,
1135 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1136 ref_count: usize,
1137 socket: Arc<dyn AsyncUdpSocket>,
1138 io_poller: Pin<Box<dyn UdpPoller>>,
1139 runtime: Arc<dyn Runtime>,
1140 send_buffer: Vec<u8>,
1141 /// We buffer a transmit when the underlying I/O would block
1142 buffered_transmit: Option<crate::Transmit>,
1143 /// True once we've initiated automatic channel binding (if enabled)
1144 binding_started: bool,
1145}
1146
1147impl State {
1148 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1149 let now = self.runtime.now();
1150 let mut transmits = 0;
1151
1152 let max_datagrams = self
1153 .socket
1154 .max_transmit_segments()
1155 .min(MAX_TRANSMIT_SEGMENTS);
1156
1157 loop {
1158 // Retry the last transmit, or get a new one.
1159 let t = match self.buffered_transmit.take() {
1160 Some(t) => t,
1161 None => {
1162 self.send_buffer.clear();
1163 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1164 match self
1165 .inner
1166 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1167 {
1168 Some(t) => {
1169 transmits += match t.segment_size {
1170 None => 1,
1171 Some(s) => t.size.div_ceil(s), // round up
1172 };
1173 t
1174 }
1175 None => break,
1176 }
1177 }
1178 };
1179
1180 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1181 // Retry after a future wakeup
1182 self.buffered_transmit = Some(t);
1183 return Ok(false);
1184 }
1185
1186 let len = t.size;
1187 let retry = match self
1188 .socket
1189 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1190 {
1191 Ok(()) => false,
1192 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1193 Err(e) => return Err(e),
1194 };
1195 if retry {
1196 // We thought the socket was writable, but it wasn't. Retry so that either another
1197 // `poll_writable` call determines that the socket is indeed not writable and
1198 // registers us for a wakeup, or the send succeeds if this really was just a
1199 // transient failure.
1200 self.buffered_transmit = Some(t);
1201 continue;
1202 }
1203
1204 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1205 // TODO: What isn't ideal here yet is that if we don't poll all
1206 // datagrams that could be sent we don't go into the `app_limited`
1207 // state and CWND continues to grow until we get here the next time.
1208 // See https://github.com/quinn-rs/quinn/issues/1126
1209 return Ok(true);
1210 }
1211 }
1212
1213 Ok(false)
1214 }
1215
1216 fn forward_endpoint_events(&mut self) {
1217 while let Some(event) = self.inner.poll_endpoint_events() {
1218 // If the endpoint driver is gone, noop.
1219 let _ = self.endpoint_events.send((self.handle, event));
1220 }
1221 }
1222
1223 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1224 fn process_conn_events(
1225 &mut self,
1226 shared: &Shared,
1227 cx: &mut Context,
1228 ) -> Result<(), ConnectionError> {
1229 loop {
1230 match self.conn_events.poll_recv(cx) {
1231 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1232 self.socket = socket;
1233 self.io_poller = self.socket.clone().create_io_poller();
1234 self.inner.local_address_changed();
1235 }
1236 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1237 self.inner.handle_event(event);
1238 }
1239 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1240 self.close(error_code, reason, shared);
1241 }
1242 Poll::Ready(None) => {
1243 return Err(ConnectionError::TransportError(crate::TransportError {
1244 code: crate::TransportErrorCode::INTERNAL_ERROR,
1245 frame: None,
1246 reason: "endpoint driver future was dropped".to_string(),
1247 }));
1248 }
1249 Poll::Pending => {
1250 return Ok(());
1251 }
1252 }
1253 }
1254 }
1255
1256 fn forward_app_events(&mut self, shared: &Shared) {
1257 while let Some(event) = self.inner.poll() {
1258 use crate::Event::*;
1259 match event {
1260 HandshakeDataReady => {
1261 if let Some(x) = self.on_handshake_data.take() {
1262 let _ = x.send(());
1263 }
1264 }
1265 Connected => {
1266 self.connected = true;
1267 if let Some(x) = self.on_connected.take() {
1268 // We don't care if the on-connected future was dropped
1269 let _ = x.send(self.inner.accepted_0rtt());
1270 }
1271 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1272 // Wake up rejected 0-RTT streams so they can fail immediately with
1273 // `ZeroRttRejected` errors.
1274 wake_all(&mut self.blocked_writers);
1275 wake_all(&mut self.blocked_readers);
1276 wake_all_notify(&mut self.stopped);
1277 }
1278 }
1279 ConnectionLost { reason } => {
1280 self.terminate(reason, shared);
1281 }
1282 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1283 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1284 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1285 }
1286 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1287 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1288 }
1289 DatagramReceived => {
1290 shared.datagram_received.notify_waiters();
1291 }
1292 DatagramsUnblocked => {
1293 shared.datagrams_unblocked.notify_waiters();
1294 }
1295 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1296 Stream(StreamEvent::Available { dir }) => {
1297 // Might mean any number of streams are ready, so we wake up everyone
1298 shared.stream_budget_available[dir as usize].notify_waiters();
1299 }
1300 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1301 Stream(StreamEvent::Stopped { id, .. }) => {
1302 wake_stream_notify(id, &mut self.stopped);
1303 wake_stream(id, &mut self.blocked_writers);
1304 }
1305 }
1306 }
1307 }
1308
1309 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1310 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1311 // timer is registered with the runtime (and check whether it's already
1312 // expired).
1313 match self.inner.poll_timeout() {
1314 Some(deadline) => {
1315 if let Some(delay) = &mut self.timer {
1316 // There is no need to reset the tokio timer if the deadline
1317 // did not change
1318 if self
1319 .timer_deadline
1320 .map(|current_deadline| current_deadline != deadline)
1321 .unwrap_or(true)
1322 {
1323 delay.as_mut().reset(deadline);
1324 }
1325 } else {
1326 self.timer = Some(self.runtime.new_timer(deadline));
1327 }
1328 // Store the actual expiration time of the timer
1329 self.timer_deadline = Some(deadline);
1330 }
1331 None => {
1332 self.timer_deadline = None;
1333 return false;
1334 }
1335 }
1336
1337 if self.timer_deadline.is_none() {
1338 return false;
1339 }
1340
1341 let delay = match self.timer.as_mut() {
1342 Some(timer) => timer.as_mut(),
1343 None => {
1344 error!("Timer missing in state where it should exist");
1345 return false;
1346 }
1347 };
1348 if delay.poll(cx).is_pending() {
1349 // Since there wasn't a timeout event, there is nothing new
1350 // for the connection to do
1351 return false;
1352 }
1353
1354 // A timer expired, so the caller needs to check for
1355 // new transmits, which might cause new timers to be set.
1356 self.inner.handle_timeout(self.runtime.now());
1357 self.timer_deadline = None;
1358 true
1359 }
1360
1361 /// Wake up a blocked `Driver` task to process I/O
1362 pub(crate) fn wake(&mut self) {
1363 if let Some(x) = self.driver.take() {
1364 x.wake();
1365 }
1366 }
1367
1368 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1369 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1370 self.error = Some(reason.clone());
1371 if let Some(x) = self.on_handshake_data.take() {
1372 let _ = x.send(());
1373 }
1374 wake_all(&mut self.blocked_writers);
1375 wake_all(&mut self.blocked_readers);
1376 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1377 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1378 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1379 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1380 shared.datagram_received.notify_waiters();
1381 shared.datagrams_unblocked.notify_waiters();
1382 if let Some(x) = self.on_connected.take() {
1383 let _ = x.send(false);
1384 }
1385 wake_all_notify(&mut self.stopped);
1386 shared.closed.notify_waiters();
1387 }
1388
1389 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1390 self.inner.close(self.runtime.now(), error_code, reason);
1391 self.terminate(ConnectionError::LocallyClosed, shared);
1392 self.wake();
1393 }
1394
1395 /// Close for a reason other than the application's explicit request
1396 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1397 self.close(0u32.into(), Bytes::new(), shared);
1398 }
1399
1400 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1401 if self.inner.is_handshaking()
1402 || self.inner.accepted_0rtt()
1403 || self.inner.side().is_server()
1404 {
1405 Ok(())
1406 } else {
1407 Err(())
1408 }
1409 }
1410}
1411
1412impl Drop for State {
1413 fn drop(&mut self) {
1414 if !self.inner.is_drained() {
1415 // Ensure the endpoint can tidy up
1416 let _ = self
1417 .endpoint_events
1418 .send((self.handle, crate::EndpointEvent::drained()));
1419 }
1420 }
1421}
1422
1423impl fmt::Debug for State {
1424 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1425 f.debug_struct("State").field("inner", &self.inner).finish()
1426 }
1427}
1428
1429fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1430 if let Some(waker) = wakers.remove(&stream_id) {
1431 waker.wake();
1432 }
1433}
1434
1435fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1436 wakers.drain().for_each(|(_, waker)| waker.wake())
1437}
1438
1439fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1440 if let Some(notify) = wakers.remove(&stream_id) {
1441 notify.notify_waiters()
1442 }
1443}
1444
1445fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1446 wakers
1447 .drain()
1448 .for_each(|(_, notify)| notify.notify_waiters())
1449}
1450
1451/// Errors that can arise when sending a datagram
1452#[derive(Debug, Error, Clone, Eq, PartialEq)]
1453pub enum SendDatagramError {
1454 /// The peer does not support receiving datagram frames
1455 #[error("datagrams not supported by peer")]
1456 UnsupportedByPeer,
1457 /// Datagram support is disabled locally
1458 #[error("datagram support disabled")]
1459 Disabled,
1460 /// The datagram is larger than the connection can currently accommodate
1461 ///
1462 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1463 /// exceeded.
1464 #[error("datagram too large")]
1465 TooLarge,
1466 /// The connection was lost
1467 #[error("connection lost")]
1468 ConnectionLost(#[from] ConnectionError),
1469}
1470
1471/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1472///
1473/// This limits the amount of CPU resources consumed by datagram generation,
1474/// and allows other tasks (like receiving ACKs) to run in between.
1475const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1476
1477/// The maximum amount of datagrams that are sent in a single transmit
1478///
1479/// This can be lower than the maximum platform capabilities, to avoid excessive
1480/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1481/// that numbers around 10 are a good compromise.
1482const MAX_TRANSMIT_SEGMENTS: usize = 10;