ant_quic/high_level/connection.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9 any::Any,
10 fmt,
11 future::Future,
12 io,
13 net::{IpAddr, SocketAddr},
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll, Waker, ready},
17};
18
19use bytes::Bytes;
20use pin_project_lite::pin_project;
21use rustc_hash::FxHashMap;
22use thiserror::Error;
23use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
24use tracing::{Instrument, Span, debug_span, error};
25
26use super::{
27 ConnectionEvent,
28 mutex::Mutex,
29 recv_stream::RecvStream,
30 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
31 send_stream::SendStream,
32 udp_transmit,
33};
34use crate::{
35 ConnectError, ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent,
36 Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
37};
38
39/// In-progress connection attempt future
40#[derive(Debug)]
41pub struct Connecting {
42 conn: Option<ConnectionRef>,
43 connected: oneshot::Receiver<bool>,
44 handshake_data_ready: Option<oneshot::Receiver<()>>,
45}
46
47impl Connecting {
48 pub(crate) fn new(
49 handle: ConnectionHandle,
50 conn: crate::Connection,
51 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
52 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
53 socket: Arc<dyn AsyncUdpSocket>,
54 runtime: Arc<dyn Runtime>,
55 ) -> Self {
56 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
57 let (on_connected_send, on_connected_recv) = oneshot::channel();
58 let conn = ConnectionRef::new(
59 handle,
60 conn,
61 endpoint_events,
62 conn_events,
63 on_handshake_data_send,
64 on_connected_send,
65 socket,
66 runtime.clone(),
67 );
68
69 let driver = ConnectionDriver(conn.clone());
70 runtime.spawn(Box::pin(
71 async {
72 if let Err(e) = driver.await {
73 tracing::error!("I/O error: {e}");
74 }
75 }
76 .instrument(Span::current()),
77 ));
78
79 Self {
80 conn: Some(conn),
81 connected: on_connected_recv,
82 handshake_data_ready: Some(on_handshake_data_recv),
83 }
84 }
85
86 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87 ///
88 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89 /// If so, the returned [`Connection`] can be used to send application data without waiting for
90 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92 /// complete, at which point subsequently opened streams and written data will have full
93 /// cryptographic protection.
94 ///
95 /// ## Outgoing
96 ///
97 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103 /// If it was rejected, the existence of streams opened and other application data sent prior
104 /// to the handshake completing will not be conveyed to the remote application, and local
105 /// operations on them will return `ZeroRttRejected` errors.
106 ///
107 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108 /// relevant resumption state to be stored in the server, which servers may limit or lose for
109 /// various reasons including not persisting resumption state across server restarts.
110 ///
111 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112 /// implementation's docs for 0-RTT pitfalls.
113 ///
114 /// ## Incoming
115 ///
116 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118 ///
119 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120 /// implementation's docs for 0-RTT pitfalls.
121 ///
122 /// ## Security
123 ///
124 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125 /// replay attacks, and should therefore never invoke non-idempotent operations.
126 ///
127 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128 /// before TLS client authentication has occurred, and should therefore not be used to send
129 /// data for which client authentication is being used.
130 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132 // have to release it explicitly before returning `self` by value.
133 let conn = match self.conn.as_mut() {
134 Some(conn) => conn.state.lock("into_0rtt"),
135 None => {
136 return Err(self);
137 }
138 };
139
140 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
141 drop(conn);
142
143 if is_ok {
144 match self.conn.take() {
145 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
146 None => {
147 tracing::error!("Connection state missing during 0-RTT acceptance");
148 Err(self)
149 }
150 }
151 } else {
152 Err(self)
153 }
154 }
155
156 /// Parameters negotiated during the handshake
157 ///
158 /// The dynamic type returned is determined by the configured
159 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
160 /// be [`downcast`](Box::downcast) to a
161 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
162 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
163 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
164 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
165 // simple.
166 if let Some(x) = self.handshake_data_ready.take() {
167 let _ = x.await;
168 }
169 let conn = self.conn.as_ref().ok_or_else(|| {
170 tracing::error!("Connection state missing while retrieving handshake data");
171 ConnectionError::LocallyClosed
172 })?;
173 let inner = conn.state.lock("handshake");
174 inner
175 .inner
176 .crypto_session()
177 .handshake_data()
178 .ok_or_else(|| {
179 inner.error.clone().unwrap_or_else(|| {
180 error!("Spurious handshake data ready notification with no error");
181 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
182 "Spurious handshake notification".to_string(),
183 ))
184 })
185 })
186 }
187
188 /// The local IP address which was used when the peer established
189 /// the connection
190 ///
191 /// This can be different from the address the endpoint is bound to, in case
192 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
193 ///
194 /// This will return `None` for clients, or when the platform does not expose this
195 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
196 /// supported platforms when using quinn_udp for I/O, which is the default.
197 ///
198 /// Will panic if called after `poll` has returned `Ready`.
199 pub fn local_ip(&self) -> Option<IpAddr> {
200 let conn = self.conn.as_ref()?;
201 let inner = conn.state.lock("local_ip");
202
203 inner.inner.local_ip()
204 }
205
206 /// The peer's UDP address
207 ///
208 /// Returns an error if called after `poll` has returned `Ready`.
209 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
210 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
211 error!("Connection used after yielding Ready");
212 ConnectionError::LocallyClosed
213 })?;
214 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
215 }
216}
217
218impl Future for Connecting {
219 type Output = Result<Connection, ConnectionError>;
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221 Pin::new(&mut self.connected).poll(cx).map(|_| {
222 let conn = self.conn.take().ok_or_else(|| {
223 error!("Connection not available when connecting future resolves");
224 ConnectionError::LocallyClosed
225 })?;
226 let inner = conn.state.lock("connecting");
227 if inner.connected {
228 drop(inner);
229 Ok(Connection(conn))
230 } else {
231 Err(inner.error.clone().unwrap_or_else(|| {
232 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
233 "connection failed without error".to_string(),
234 ))
235 }))
236 }
237 })
238 }
239}
240
241/// Future that completes when a connection is fully established
242///
243/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
244/// value is meaningless.
245pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
246
247impl Future for ZeroRttAccepted {
248 type Output = bool;
249 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
250 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
251 }
252}
253
254/// A future that drives protocol logic for a connection
255///
256/// This future handles the protocol logic for a single connection, routing events from the
257/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
258/// It also keeps track of outstanding timeouts for the `Connection`.
259///
260/// If the connection encounters an error condition, this future will yield an error. It will
261/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
262/// connection-related futures, this waits for the draining period to complete to ensure that
263/// packets still in flight from the peer are handled gracefully.
264#[must_use = "connection drivers must be spawned for their connections to function"]
265#[derive(Debug)]
266struct ConnectionDriver(ConnectionRef);
267
268impl Future for ConnectionDriver {
269 type Output = Result<(), io::Error>;
270
271 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
272 let conn = &mut *self.0.state.lock("poll");
273
274 let span = debug_span!("drive", id = conn.handle.0);
275 let _guard = span.enter();
276
277 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
278 conn.terminate(e, &self.0.shared);
279 return Poll::Ready(Ok(()));
280 }
281 let mut keep_going = conn.drive_transmit(cx)?;
282 // If a timer expires, there might be more to transmit. When we transmit something, we
283 // might need to reset a timer. Hence, we must loop until neither happens.
284 keep_going |= conn.drive_timer(cx);
285 conn.forward_endpoint_events();
286 conn.forward_app_events(&self.0.shared);
287
288 // Kick off automatic channel binding once connected, if configured
289 if conn.connected && !conn.binding_started {
290 if let Some(rt) = crate::trust::global_runtime() {
291 // Delay NEW_TOKEN until binding completes
292 conn.inner.set_delay_new_token_until_binding(true);
293
294 let hl_conn_server = Connection(self.0.clone());
295 let hl_conn_client = hl_conn_server.clone();
296 let store = rt.store.clone();
297 let policy = rt.policy.clone();
298 let signer = rt.local_signing_key.clone();
299 let spki = rt.local_spki.clone();
300 let runtime = conn.runtime.clone();
301
302 if conn.inner.side().is_server() {
303 runtime.spawn(Box::pin(async move {
304 match crate::trust::recv_verify_binding_ed25519(
305 &hl_conn_server,
306 &*store,
307 &policy,
308 )
309 .await
310 {
311 Ok(peer) => {
312 hl_conn_server
313 .0
314 .state
315 .lock("set peer")
316 .inner
317 .set_token_binding_peer_id(peer);
318 hl_conn_server
319 .0
320 .state
321 .lock("allow tokens")
322 .inner
323 .set_delay_new_token_until_binding(false);
324 }
325 Err(_e) => {
326 hl_conn_server.close(0u32.into(), b"channel binding failed");
327 }
328 }
329 }));
330 }
331
332 if conn.inner.side().is_client() {
333 runtime.spawn(Box::pin(async move {
334 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
335 let _ = crate::trust::send_binding_ed25519(
336 &hl_conn_client,
337 &exp,
338 &signer,
339 &spki,
340 )
341 .await;
342 }
343 }));
344 }
345
346 conn.binding_started = true;
347 }
348 }
349
350 if !conn.inner.is_drained() {
351 if keep_going {
352 // If the connection hasn't processed all tasks, schedule it again
353 cx.waker().wake_by_ref();
354 } else {
355 conn.driver = Some(cx.waker().clone());
356 }
357 return Poll::Pending;
358 }
359 if conn.error.is_none() {
360 unreachable!("drained connections always have an error");
361 }
362 Poll::Ready(Ok(()))
363 }
364}
365
366/// A QUIC connection.
367///
368/// If all references to a connection (including every clone of the `Connection` handle, streams of
369/// incoming streams, and the various stream types) have been dropped, then the connection will be
370/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
371/// connection explicitly by calling [`Connection::close()`].
372///
373/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
374/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
375/// application. [`Connection::close()`] describes in more detail how to gracefully close a
376/// connection without losing application data.
377///
378/// May be cloned to obtain another handle to the same connection.
379///
380/// [`Connection::close()`]: Connection::close
381#[derive(Debug, Clone)]
382pub struct Connection(ConnectionRef);
383
384impl Connection {
385 /// Initiate a new outgoing unidirectional stream.
386 ///
387 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
388 /// consequence, the peer won't be notified that a stream has been opened until the stream is
389 /// actually used.
390 pub fn open_uni(&self) -> OpenUni<'_> {
391 OpenUni {
392 conn: &self.0,
393 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
394 }
395 }
396
397 /// Initiate a new outgoing bidirectional stream.
398 ///
399 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
400 /// consequence, the peer won't be notified that a stream has been opened until the stream is
401 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
402 /// anything to [`SendStream`] will never succeed.
403 ///
404 /// [`open_bi()`]: Self::open_bi
405 /// [`SendStream`]: crate::SendStream
406 /// [`RecvStream`]: crate::RecvStream
407 pub fn open_bi(&self) -> OpenBi<'_> {
408 OpenBi {
409 conn: &self.0,
410 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
411 }
412 }
413
414 /// Accept the next incoming uni-directional stream
415 pub fn accept_uni(&self) -> AcceptUni<'_> {
416 AcceptUni {
417 conn: &self.0,
418 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
419 }
420 }
421
422 /// Accept the next incoming bidirectional stream
423 ///
424 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
425 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
426 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
427 ///
428 /// [`accept_bi()`]: Self::accept_bi
429 /// [`open_bi()`]: Self::open_bi
430 /// [`SendStream`]: crate::SendStream
431 /// [`RecvStream`]: crate::RecvStream
432 pub fn accept_bi(&self) -> AcceptBi<'_> {
433 AcceptBi {
434 conn: &self.0,
435 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
436 }
437 }
438
439 /// Receive an application datagram
440 pub fn read_datagram(&self) -> ReadDatagram<'_> {
441 ReadDatagram {
442 conn: &self.0,
443 notify: self.0.shared.datagram_received.notified(),
444 }
445 }
446
447 /// Wait for the connection to be closed for any reason
448 ///
449 /// Despite the return type's name, closed connections are often not an error condition at the
450 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
451 /// and [`ConnectionError::ApplicationClosed`].
452 pub async fn closed(&self) -> ConnectionError {
453 {
454 let conn = self.0.state.lock("closed");
455 if let Some(error) = conn.error.as_ref() {
456 return error.clone();
457 }
458 // Construct the future while the lock is held to ensure we can't miss a wakeup if
459 // the `Notify` is signaled immediately after we release the lock. `await` it after
460 // the lock guard is out of scope.
461 self.0.shared.closed.notified()
462 }
463 .await;
464 self.0
465 .state
466 .lock("closed")
467 .error
468 .as_ref()
469 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
470 .clone()
471 }
472
473 /// If the connection is closed, the reason why.
474 ///
475 /// Returns `None` if the connection is still open.
476 pub fn close_reason(&self) -> Option<ConnectionError> {
477 self.0.state.lock("close_reason").error.clone()
478 }
479
480 /// Close the connection immediately.
481 ///
482 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
483 /// more data is sent to the peer and the peer may drop buffered data upon receiving
484 /// the CONNECTION_CLOSE frame.
485 ///
486 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
487 ///
488 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
489 /// is preserved in full, it should be kept under 1KiB.
490 ///
491 /// # Gracefully closing a connection
492 ///
493 /// Only the peer last receiving application data can be certain that all data is
494 /// delivered. The only reliable action it can then take is to close the connection,
495 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
496 /// frame is very likely if both endpoints stay online long enough, and
497 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
498 /// remote peer will time out the connection, provided that the idle timeout is not
499 /// disabled.
500 ///
501 /// The sending side can not guarantee all stream data is delivered to the remote
502 /// application. It only knows the data is delivered to the QUIC stack of the remote
503 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
504 /// [`close()`] the remote endpoint may drop any data it received but is as yet
505 /// undelivered to the application, including data that was acknowledged as received to
506 /// the local endpoint.
507 ///
508 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
509 /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
510 /// [`close()`]: Connection::close
511 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
512 let conn = &mut *self.0.state.lock("close");
513 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
514 }
515
516 /// Transmit `data` as an unreliable, unordered application datagram
517 ///
518 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
519 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
520 /// dictated by the peer.
521 ///
522 /// Previously queued datagrams which are still unsent may be discarded to make space for this
523 /// datagram, in order of oldest to newest.
524 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
525 let conn = &mut *self.0.state.lock("send_datagram");
526 if let Some(ref x) = conn.error {
527 return Err(SendDatagramError::ConnectionLost(x.clone()));
528 }
529 use crate::SendDatagramError::*;
530 match conn.inner.datagrams().send(data, true) {
531 Ok(()) => {
532 conn.wake();
533 Ok(())
534 }
535 Err(e) => Err(match e {
536 Blocked(..) => unreachable!(),
537 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
538 Disabled => SendDatagramError::Disabled,
539 TooLarge => SendDatagramError::TooLarge,
540 }),
541 }
542 }
543
544 /// Transmit `data` as an unreliable, unordered application datagram
545 ///
546 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
547 /// conditions, which effectively prioritizes old datagrams over new datagrams.
548 ///
549 /// See [`send_datagram()`] for details.
550 ///
551 /// [`send_datagram()`]: Connection::send_datagram
552 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
553 SendDatagram {
554 conn: &self.0,
555 data: Some(data),
556 notify: self.0.shared.datagrams_unblocked.notified(),
557 }
558 }
559
560 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
561 ///
562 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
563 ///
564 /// This may change over the lifetime of a connection according to variation in the path MTU
565 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
566 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
567 ///
568 /// Not necessarily the maximum size of received datagrams.
569 ///
570 /// [`send_datagram()`]: Connection::send_datagram
571 pub fn max_datagram_size(&self) -> Option<usize> {
572 self.0
573 .state
574 .lock("max_datagram_size")
575 .inner
576 .datagrams()
577 .max_size()
578 }
579
580 /// Bytes available in the outgoing datagram buffer
581 ///
582 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
583 /// at most this size is guaranteed not to cause older datagrams to be dropped.
584 pub fn datagram_send_buffer_space(&self) -> usize {
585 self.0
586 .state
587 .lock("datagram_send_buffer_space")
588 .inner
589 .datagrams()
590 .send_buffer_space()
591 }
592
593 /// Queue an ADD_ADDRESS NAT traversal frame via the underlying connection
594 pub fn send_nat_address_advertisement(
595 &self,
596 address: SocketAddr,
597 priority: u32,
598 ) -> Result<u64, crate::ConnectionError> {
599 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
600 conn.inner.send_nat_address_advertisement(address, priority)
601 }
602
603 /// Queue a PUNCH_ME_NOW NAT traversal frame via the underlying connection
604 pub fn send_nat_punch_coordination(
605 &self,
606 paired_with_sequence_number: u64,
607 address: SocketAddr,
608 round: u32,
609 ) -> Result<(), crate::ConnectionError> {
610 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
611 conn.inner
612 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
613 }
614
615 /// The side of the connection (client or server)
616 pub fn side(&self) -> Side {
617 self.0.state.lock("side").inner.side()
618 }
619
620 /// The peer's UDP address
621 ///
622 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
623 /// switching to a cellular internet connection.
624 pub fn remote_address(&self) -> SocketAddr {
625 self.0.state.lock("remote_address").inner.remote_address()
626 }
627
628 /// The external/reflexive address observed by the remote peer
629 ///
630 /// Returns the address that the remote peer has observed for this connection,
631 /// as reported via OBSERVED_ADDRESS frames. This is useful for NAT traversal
632 /// to learn the public address of this endpoint as seen by others.
633 ///
634 /// Returns `None` if:
635 /// - Address discovery is not enabled
636 /// - No OBSERVED_ADDRESS frame has been received yet
637 /// - The connection hasn't completed the handshake
638 pub fn observed_address(&self) -> Option<SocketAddr> {
639 self.0
640 .state
641 .lock("observed_address")
642 .inner
643 .observed_address()
644 }
645
646 /// The local IP address which was used when the peer established
647 /// the connection
648 ///
649 /// This can be different from the address the endpoint is bound to, in case
650 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
651 ///
652 /// This will return `None` for clients, or when the platform does not expose this
653 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
654 /// supported platforms when using quinn_udp for I/O, which is the default.
655 pub fn local_ip(&self) -> Option<IpAddr> {
656 self.0.state.lock("local_ip").inner.local_ip()
657 }
658
659 /// Current best estimate of this connection's latency (round-trip-time)
660 pub fn rtt(&self) -> Duration {
661 self.0.state.lock("rtt").inner.rtt()
662 }
663
664 /// Returns connection statistics
665 pub fn stats(&self) -> ConnectionStats {
666 self.0.state.lock("stats").inner.stats()
667 }
668
669 /// Current state of the congestion control algorithm, for debugging purposes
670 pub fn congestion_state(&self) -> Box<dyn Controller> {
671 self.0
672 .state
673 .lock("congestion_state")
674 .inner
675 .congestion_state()
676 .clone_box()
677 }
678
679 /// Parameters negotiated during the handshake
680 ///
681 /// Guaranteed to return `Some` on fully established connections or after
682 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
683 /// the returned value.
684 ///
685 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
686 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
687 self.0
688 .state
689 .lock("handshake_data")
690 .inner
691 .crypto_session()
692 .handshake_data()
693 }
694
695 /// Cryptographic identity of the peer
696 ///
697 /// The dynamic type returned is determined by the configured
698 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
699 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
700 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
701 self.0
702 .state
703 .lock("peer_identity")
704 .inner
705 .crypto_session()
706 .peer_identity()
707 }
708
709 /// A stable identifier for this connection
710 ///
711 /// Peer addresses and connection IDs can change, but this value will remain
712 /// fixed for the lifetime of the connection.
713 pub fn stable_id(&self) -> usize {
714 self.0.stable_id()
715 }
716
717 /// Returns true if this connection negotiated post-quantum settings.
718 ///
719 /// This reflects either explicit PQC algorithms advertised via transport
720 /// parameters or in-band detection from handshake CRYPTO frames.
721 pub fn is_pqc(&self) -> bool {
722 let state = self.0.state.lock("is_pqc");
723 state.inner.is_pqc()
724 }
725
726 /// Debug-only hint: returns true when the underlying TLS provider was
727 /// configured to run in KEM-only (ML‑KEM) mode. This is a diagnostic aid
728 /// for tests and does not itself guarantee group enforcement.
729 pub fn debug_kem_only(&self) -> bool {
730 crate::crypto::rustls::debug_kem_only_enabled()
731 }
732
733 /// Update traffic keys spontaneously
734 ///
735 /// This primarily exists for testing purposes.
736 pub fn force_key_update(&self) {
737 self.0
738 .state
739 .lock("force_key_update")
740 .inner
741 .force_key_update()
742 }
743
744 /// Derive keying material from this connection's TLS session secrets.
745 ///
746 /// When both peers call this method with the same `label` and `context`
747 /// arguments and `output` buffers of equal length, they will get the
748 /// same sequence of bytes in `output`. These bytes are cryptographically
749 /// strong and pseudorandom, and are suitable for use as keying material.
750 ///
751 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
752 pub fn export_keying_material(
753 &self,
754 output: &mut [u8],
755 label: &[u8],
756 context: &[u8],
757 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
758 self.0
759 .state
760 .lock("export_keying_material")
761 .inner
762 .crypto_session()
763 .export_keying_material(output, label, context)
764 }
765
766 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
767 ///
768 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
769 /// `count`s increase both minimum and worst-case memory consumption.
770 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
771 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
772 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
773 // May need to send MAX_STREAMS to make progress
774 conn.wake();
775 }
776
777 /// See [`crate::TransportConfig::receive_window()`]
778 pub fn set_receive_window(&self, receive_window: VarInt) {
779 let mut conn = self.0.state.lock("set_receive_window");
780 conn.inner.set_receive_window(receive_window);
781 conn.wake();
782 }
783
784 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
785 ///
786 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
787 /// `count`s increase both minimum and worst-case memory consumption.
788 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
789 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
790 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
791 // May need to send MAX_STREAMS to make progress
792 conn.wake();
793 }
794
795 /// Set up qlog for this connection.
796 #[cfg(feature = "__qlog")]
797 pub fn set_qlog(
798 &mut self,
799 writer: Box<dyn std::io::Write + Send + Sync>,
800 title: Option<String>,
801 description: Option<String>,
802 ) {
803 let mut state = self.0.state.lock("__qlog");
804 state
805 .inner
806 .set_qlog(writer, title, description, Instant::now());
807 }
808}
809
810pin_project! {
811 /// Future produced by [`Connection::open_uni`]
812 pub struct OpenUni<'a> {
813 conn: &'a ConnectionRef,
814 #[pin]
815 notify: Notified<'a>,
816 }
817}
818
819impl Future for OpenUni<'_> {
820 type Output = Result<SendStream, ConnectionError>;
821 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
822 let this = self.project();
823 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
824 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
825 }
826}
827
828pin_project! {
829 /// Future produced by [`Connection::open_bi`]
830 pub struct OpenBi<'a> {
831 conn: &'a ConnectionRef,
832 #[pin]
833 notify: Notified<'a>,
834 }
835}
836
837impl Future for OpenBi<'_> {
838 type Output = Result<(SendStream, RecvStream), ConnectionError>;
839 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
840 let this = self.project();
841 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
842
843 Poll::Ready(Ok((
844 SendStream::new(conn.clone(), id, is_0rtt),
845 RecvStream::new(conn, id, is_0rtt),
846 )))
847 }
848}
849
850fn poll_open<'a>(
851 ctx: &mut Context<'_>,
852 conn: &'a ConnectionRef,
853 mut notify: Pin<&mut Notified<'a>>,
854 dir: Dir,
855) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
856 let mut state = conn.state.lock("poll_open");
857 if let Some(ref e) = state.error {
858 return Poll::Ready(Err(e.clone()));
859 } else if let Some(id) = state.inner.streams().open(dir) {
860 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
861 drop(state); // Release the lock so clone can take it
862 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
863 }
864 loop {
865 match notify.as_mut().poll(ctx) {
866 // `state` lock ensures we didn't race with readiness
867 Poll::Pending => return Poll::Pending,
868 // Spurious wakeup, get a new future
869 Poll::Ready(()) => {
870 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
871 }
872 }
873 }
874}
875
876pin_project! {
877 /// Future produced by [`Connection::accept_uni`]
878 pub struct AcceptUni<'a> {
879 conn: &'a ConnectionRef,
880 #[pin]
881 notify: Notified<'a>,
882 }
883}
884
885impl Future for AcceptUni<'_> {
886 type Output = Result<RecvStream, ConnectionError>;
887
888 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
889 let this = self.project();
890 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
891 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
892 }
893}
894
895pin_project! {
896 /// Future produced by [`Connection::accept_bi`]
897 pub struct AcceptBi<'a> {
898 conn: &'a ConnectionRef,
899 #[pin]
900 notify: Notified<'a>,
901 }
902}
903
904impl Future for AcceptBi<'_> {
905 type Output = Result<(SendStream, RecvStream), ConnectionError>;
906
907 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
908 let this = self.project();
909 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
910 Poll::Ready(Ok((
911 SendStream::new(conn.clone(), id, is_0rtt),
912 RecvStream::new(conn, id, is_0rtt),
913 )))
914 }
915}
916
917fn poll_accept<'a>(
918 ctx: &mut Context<'_>,
919 conn: &'a ConnectionRef,
920 mut notify: Pin<&mut Notified<'a>>,
921 dir: Dir,
922) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
923 let mut state = conn.state.lock("poll_accept");
924 // Check for incoming streams before checking `state.error` so that already-received streams,
925 // which are necessarily finite, can be drained from a closed connection.
926 if let Some(id) = state.inner.streams().accept(dir) {
927 let is_0rtt = state.inner.is_handshaking();
928 state.wake(); // To send additional stream ID credit
929 drop(state); // Release the lock so clone can take it
930 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
931 } else if let Some(ref e) = state.error {
932 return Poll::Ready(Err(e.clone()));
933 }
934 loop {
935 match notify.as_mut().poll(ctx) {
936 // `state` lock ensures we didn't race with readiness
937 Poll::Pending => return Poll::Pending,
938 // Spurious wakeup, get a new future
939 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
940 }
941 }
942}
943
944pin_project! {
945 /// Future produced by [`Connection::read_datagram`]
946 pub struct ReadDatagram<'a> {
947 conn: &'a ConnectionRef,
948 #[pin]
949 notify: Notified<'a>,
950 }
951}
952
953impl Future for ReadDatagram<'_> {
954 type Output = Result<Bytes, ConnectionError>;
955 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
956 let mut this = self.project();
957 let mut state = this.conn.state.lock("ReadDatagram::poll");
958 // Check for buffered datagrams before checking `state.error` so that already-received
959 // datagrams, which are necessarily finite, can be drained from a closed connection.
960 match state.inner.datagrams().recv() {
961 Some(x) => {
962 return Poll::Ready(Ok(x));
963 }
964 _ => {
965 if let Some(ref e) = state.error {
966 return Poll::Ready(Err(e.clone()));
967 }
968 }
969 }
970 loop {
971 match this.notify.as_mut().poll(ctx) {
972 // `state` lock ensures we didn't race with readiness
973 Poll::Pending => return Poll::Pending,
974 // Spurious wakeup, get a new future
975 Poll::Ready(()) => this
976 .notify
977 .set(this.conn.shared.datagram_received.notified()),
978 }
979 }
980 }
981}
982
983pin_project! {
984 /// Future produced by [`Connection::send_datagram_wait`]
985 pub struct SendDatagram<'a> {
986 conn: &'a ConnectionRef,
987 data: Option<Bytes>,
988 #[pin]
989 notify: Notified<'a>,
990 }
991}
992
993impl Future for SendDatagram<'_> {
994 type Output = Result<(), SendDatagramError>;
995 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
996 let mut this = self.project();
997 let mut state = this.conn.state.lock("SendDatagram::poll");
998 if let Some(ref e) = state.error {
999 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1000 }
1001 use crate::SendDatagramError::*;
1002 match state.inner.datagrams().send(
1003 this.data.take().ok_or_else(|| {
1004 error!("SendDatagram future polled without data");
1005 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1006 })?,
1007 false,
1008 ) {
1009 Ok(()) => {
1010 state.wake();
1011 Poll::Ready(Ok(()))
1012 }
1013 Err(e) => Poll::Ready(Err(match e {
1014 Blocked(data) => {
1015 this.data.replace(data);
1016 loop {
1017 match this.notify.as_mut().poll(ctx) {
1018 Poll::Pending => return Poll::Pending,
1019 // Spurious wakeup, get a new future
1020 Poll::Ready(()) => this
1021 .notify
1022 .set(this.conn.shared.datagrams_unblocked.notified()),
1023 }
1024 }
1025 }
1026 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1027 Disabled => SendDatagramError::Disabled,
1028 TooLarge => SendDatagramError::TooLarge,
1029 })),
1030 }
1031 }
1032}
1033
1034#[derive(Debug)]
1035pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1036
1037impl ConnectionRef {
1038 #[allow(clippy::too_many_arguments)]
1039 fn new(
1040 handle: ConnectionHandle,
1041 conn: crate::Connection,
1042 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1043 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1044 on_handshake_data: oneshot::Sender<()>,
1045 on_connected: oneshot::Sender<bool>,
1046 socket: Arc<dyn AsyncUdpSocket>,
1047 runtime: Arc<dyn Runtime>,
1048 ) -> Self {
1049 Self(Arc::new(ConnectionInner {
1050 state: Mutex::new(State {
1051 inner: conn,
1052 driver: None,
1053 handle,
1054 on_handshake_data: Some(on_handshake_data),
1055 on_connected: Some(on_connected),
1056 connected: false,
1057 timer: None,
1058 timer_deadline: None,
1059 conn_events,
1060 endpoint_events,
1061 blocked_writers: FxHashMap::default(),
1062 blocked_readers: FxHashMap::default(),
1063 stopped: FxHashMap::default(),
1064 error: None,
1065 ref_count: 0,
1066 io_poller: socket.clone().create_io_poller(),
1067 socket,
1068 runtime,
1069 send_buffer: Vec::new(),
1070 buffered_transmit: None,
1071 binding_started: false,
1072 }),
1073 shared: Shared::default(),
1074 }))
1075 }
1076
1077 fn stable_id(&self) -> usize {
1078 &*self.0 as *const _ as usize
1079 }
1080}
1081
1082impl Clone for ConnectionRef {
1083 fn clone(&self) -> Self {
1084 self.state.lock("clone").ref_count += 1;
1085 Self(self.0.clone())
1086 }
1087}
1088
1089impl Drop for ConnectionRef {
1090 fn drop(&mut self) {
1091 let conn = &mut *self.state.lock("drop");
1092 if let Some(x) = conn.ref_count.checked_sub(1) {
1093 conn.ref_count = x;
1094 if x == 0 && !conn.inner.is_closed() {
1095 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1096 // not, we can't do any harm. If there were any streams being opened, then either
1097 // the connection will be closed for an unrelated reason or a fresh reference will
1098 // be constructed for the newly opened stream.
1099 conn.implicit_close(&self.shared);
1100 }
1101 }
1102 }
1103}
1104
1105impl std::ops::Deref for ConnectionRef {
1106 type Target = ConnectionInner;
1107 fn deref(&self) -> &Self::Target {
1108 &self.0
1109 }
1110}
1111
1112#[derive(Debug)]
1113pub(crate) struct ConnectionInner {
1114 pub(crate) state: Mutex<State>,
1115 pub(crate) shared: Shared,
1116}
1117
1118#[derive(Debug, Default)]
1119pub(crate) struct Shared {
1120 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1121 /// control budget
1122 stream_budget_available: [Notify; 2],
1123 /// Notified when the peer has initiated a new stream
1124 stream_incoming: [Notify; 2],
1125 datagram_received: Notify,
1126 datagrams_unblocked: Notify,
1127 closed: Notify,
1128}
1129
1130pub(crate) struct State {
1131 pub(crate) inner: crate::Connection,
1132 driver: Option<Waker>,
1133 handle: ConnectionHandle,
1134 on_handshake_data: Option<oneshot::Sender<()>>,
1135 on_connected: Option<oneshot::Sender<bool>>,
1136 connected: bool,
1137 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1138 timer_deadline: Option<Instant>,
1139 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1140 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1141 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1142 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1143 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1144 /// Always set to Some before the connection becomes drained
1145 pub(crate) error: Option<ConnectionError>,
1146 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1147 ref_count: usize,
1148 socket: Arc<dyn AsyncUdpSocket>,
1149 io_poller: Pin<Box<dyn UdpPoller>>,
1150 runtime: Arc<dyn Runtime>,
1151 send_buffer: Vec<u8>,
1152 /// We buffer a transmit when the underlying I/O would block
1153 buffered_transmit: Option<crate::Transmit>,
1154 /// True once we've initiated automatic channel binding (if enabled)
1155 binding_started: bool,
1156}
1157
1158impl State {
1159 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1160 let now = self.runtime.now();
1161 let mut transmits = 0;
1162
1163 let max_datagrams = self
1164 .socket
1165 .max_transmit_segments()
1166 .min(MAX_TRANSMIT_SEGMENTS);
1167
1168 loop {
1169 // Retry the last transmit, or get a new one.
1170 let t = match self.buffered_transmit.take() {
1171 Some(t) => t,
1172 None => {
1173 self.send_buffer.clear();
1174 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1175 match self
1176 .inner
1177 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1178 {
1179 Some(t) => {
1180 transmits += match t.segment_size {
1181 None => 1,
1182 Some(s) => t.size.div_ceil(s), // round up
1183 };
1184 t
1185 }
1186 None => break,
1187 }
1188 }
1189 };
1190
1191 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1192 // Retry after a future wakeup
1193 self.buffered_transmit = Some(t);
1194 return Ok(false);
1195 }
1196
1197 let len = t.size;
1198 let retry = match self
1199 .socket
1200 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1201 {
1202 Ok(()) => false,
1203 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1204 Err(e) => return Err(e),
1205 };
1206 if retry {
1207 // We thought the socket was writable, but it wasn't. Retry so that either another
1208 // `poll_writable` call determines that the socket is indeed not writable and
1209 // registers us for a wakeup, or the send succeeds if this really was just a
1210 // transient failure.
1211 self.buffered_transmit = Some(t);
1212 continue;
1213 }
1214
1215 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1216 // TODO: What isn't ideal here yet is that if we don't poll all
1217 // datagrams that could be sent we don't go into the `app_limited`
1218 // state and CWND continues to grow until we get here the next time.
1219 // See https://github.com/quinn-rs/quinn/issues/1126
1220 return Ok(true);
1221 }
1222 }
1223
1224 Ok(false)
1225 }
1226
1227 fn forward_endpoint_events(&mut self) {
1228 while let Some(event) = self.inner.poll_endpoint_events() {
1229 // If the endpoint driver is gone, noop.
1230 let _ = self.endpoint_events.send((self.handle, event));
1231 }
1232 }
1233
1234 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1235 fn process_conn_events(
1236 &mut self,
1237 shared: &Shared,
1238 cx: &mut Context,
1239 ) -> Result<(), ConnectionError> {
1240 loop {
1241 match self.conn_events.poll_recv(cx) {
1242 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1243 self.socket = socket;
1244 self.io_poller = self.socket.clone().create_io_poller();
1245 self.inner.local_address_changed();
1246 }
1247 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1248 self.inner.handle_event(event);
1249 }
1250 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1251 self.close(error_code, reason, shared);
1252 }
1253 Poll::Ready(None) => {
1254 return Err(ConnectionError::TransportError(crate::TransportError {
1255 code: crate::TransportErrorCode::INTERNAL_ERROR,
1256 frame: None,
1257 reason: "endpoint driver future was dropped".to_string(),
1258 }));
1259 }
1260 Poll::Pending => {
1261 return Ok(());
1262 }
1263 }
1264 }
1265 }
1266
1267 fn forward_app_events(&mut self, shared: &Shared) {
1268 while let Some(event) = self.inner.poll() {
1269 use crate::Event::*;
1270 match event {
1271 HandshakeDataReady => {
1272 if let Some(x) = self.on_handshake_data.take() {
1273 let _ = x.send(());
1274 }
1275 }
1276 Connected => {
1277 self.connected = true;
1278 if let Some(x) = self.on_connected.take() {
1279 // We don't care if the on-connected future was dropped
1280 let _ = x.send(self.inner.accepted_0rtt());
1281 }
1282 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1283 // Wake up rejected 0-RTT streams so they can fail immediately with
1284 // `ZeroRttRejected` errors.
1285 wake_all(&mut self.blocked_writers);
1286 wake_all(&mut self.blocked_readers);
1287 wake_all_notify(&mut self.stopped);
1288 }
1289 }
1290 ConnectionLost { reason } => {
1291 self.terminate(reason, shared);
1292 }
1293 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1294 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1295 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1296 }
1297 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1298 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1299 }
1300 DatagramReceived => {
1301 shared.datagram_received.notify_waiters();
1302 }
1303 DatagramsUnblocked => {
1304 shared.datagrams_unblocked.notify_waiters();
1305 }
1306 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1307 Stream(StreamEvent::Available { dir }) => {
1308 // Might mean any number of streams are ready, so we wake up everyone
1309 shared.stream_budget_available[dir as usize].notify_waiters();
1310 }
1311 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1312 Stream(StreamEvent::Stopped { id, .. }) => {
1313 wake_stream_notify(id, &mut self.stopped);
1314 wake_stream(id, &mut self.blocked_writers);
1315 }
1316 }
1317 }
1318 }
1319
1320 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1321 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1322 // timer is registered with the runtime (and check whether it's already
1323 // expired).
1324 match self.inner.poll_timeout() {
1325 Some(deadline) => {
1326 if let Some(delay) = &mut self.timer {
1327 // There is no need to reset the tokio timer if the deadline
1328 // did not change
1329 if self
1330 .timer_deadline
1331 .map(|current_deadline| current_deadline != deadline)
1332 .unwrap_or(true)
1333 {
1334 delay.as_mut().reset(deadline);
1335 }
1336 } else {
1337 self.timer = Some(self.runtime.new_timer(deadline));
1338 }
1339 // Store the actual expiration time of the timer
1340 self.timer_deadline = Some(deadline);
1341 }
1342 None => {
1343 self.timer_deadline = None;
1344 return false;
1345 }
1346 }
1347
1348 if self.timer_deadline.is_none() {
1349 return false;
1350 }
1351
1352 let delay = match self.timer.as_mut() {
1353 Some(timer) => timer.as_mut(),
1354 None => {
1355 error!("Timer missing in state where it should exist");
1356 return false;
1357 }
1358 };
1359 if delay.poll(cx).is_pending() {
1360 // Since there wasn't a timeout event, there is nothing new
1361 // for the connection to do
1362 return false;
1363 }
1364
1365 // A timer expired, so the caller needs to check for
1366 // new transmits, which might cause new timers to be set.
1367 self.inner.handle_timeout(self.runtime.now());
1368 self.timer_deadline = None;
1369 true
1370 }
1371
1372 /// Wake up a blocked `Driver` task to process I/O
1373 pub(crate) fn wake(&mut self) {
1374 if let Some(x) = self.driver.take() {
1375 x.wake();
1376 }
1377 }
1378
1379 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1380 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1381 self.error = Some(reason.clone());
1382 if let Some(x) = self.on_handshake_data.take() {
1383 let _ = x.send(());
1384 }
1385 wake_all(&mut self.blocked_writers);
1386 wake_all(&mut self.blocked_readers);
1387 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1388 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1389 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1390 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1391 shared.datagram_received.notify_waiters();
1392 shared.datagrams_unblocked.notify_waiters();
1393 if let Some(x) = self.on_connected.take() {
1394 let _ = x.send(false);
1395 }
1396 wake_all_notify(&mut self.stopped);
1397 shared.closed.notify_waiters();
1398 }
1399
1400 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1401 self.inner.close(self.runtime.now(), error_code, reason);
1402 self.terminate(ConnectionError::LocallyClosed, shared);
1403 self.wake();
1404 }
1405
1406 /// Close for a reason other than the application's explicit request
1407 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1408 self.close(0u32.into(), Bytes::new(), shared);
1409 }
1410
1411 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1412 if self.inner.is_handshaking()
1413 || self.inner.accepted_0rtt()
1414 || self.inner.side().is_server()
1415 {
1416 Ok(())
1417 } else {
1418 Err(())
1419 }
1420 }
1421}
1422
1423impl Drop for State {
1424 fn drop(&mut self) {
1425 if !self.inner.is_drained() {
1426 // Ensure the endpoint can tidy up
1427 let _ = self
1428 .endpoint_events
1429 .send((self.handle, crate::EndpointEvent::drained()));
1430 }
1431 }
1432}
1433
1434impl fmt::Debug for State {
1435 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1436 f.debug_struct("State").field("inner", &self.inner).finish()
1437 }
1438}
1439
1440fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1441 if let Some(waker) = wakers.remove(&stream_id) {
1442 waker.wake();
1443 }
1444}
1445
1446fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1447 wakers.drain().for_each(|(_, waker)| waker.wake())
1448}
1449
1450fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1451 if let Some(notify) = wakers.remove(&stream_id) {
1452 notify.notify_waiters()
1453 }
1454}
1455
1456fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1457 wakers
1458 .drain()
1459 .for_each(|(_, notify)| notify.notify_waiters())
1460}
1461
1462/// Errors that can arise when sending a datagram
1463#[derive(Debug, Error, Clone, Eq, PartialEq)]
1464pub enum SendDatagramError {
1465 /// The peer does not support receiving datagram frames
1466 #[error("datagrams not supported by peer")]
1467 UnsupportedByPeer,
1468 /// Datagram support is disabled locally
1469 #[error("datagram support disabled")]
1470 Disabled,
1471 /// The datagram is larger than the connection can currently accommodate
1472 ///
1473 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1474 /// exceeded.
1475 #[error("datagram too large")]
1476 TooLarge,
1477 /// The connection was lost
1478 #[error("connection lost")]
1479 ConnectionLost(#[from] ConnectionError),
1480}
1481
1482/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1483///
1484/// This limits the amount of CPU resources consumed by datagram generation,
1485/// and allows other tasks (like receiving ACKs) to run in between.
1486const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1487
1488/// The maximum amount of datagrams that are sent in a single transmit
1489///
1490/// This can be lower than the maximum platform capabilities, to avoid excessive
1491/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1492/// that numbers around 10 are a good compromise.
1493const MAX_TRANSMIT_SEGMENTS: usize = 10;