ant_quic/high_level/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use super::{
20    ConnectionEvent,
21    mutex::Mutex,
22    recv_stream::RecvStream,
23    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24    send_stream::SendStream,
25    udp_transmit,
26};
27use crate::{
28    ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent, Instant,
29    Side, StreamEvent, StreamId, VarInt, congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35    conn: Option<ConnectionRef>,
36    connected: oneshot::Receiver<bool>,
37    handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41    pub(crate) fn new(
42        handle: ConnectionHandle,
43        conn: crate::Connection,
44        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46        socket: Arc<dyn AsyncUdpSocket>,
47        runtime: Arc<dyn Runtime>,
48    ) -> Self {
49        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50        let (on_connected_send, on_connected_recv) = oneshot::channel();
51        let conn = ConnectionRef::new(
52            handle,
53            conn,
54            endpoint_events,
55            conn_events,
56            on_handshake_data_send,
57            on_connected_send,
58            socket,
59            runtime.clone(),
60        );
61
62        let driver = ConnectionDriver(conn.clone());
63        runtime.spawn(Box::pin(
64            async {
65                if let Err(e) = driver.await {
66                    tracing::error!("I/O error: {e}");
67                }
68            }
69            .instrument(Span::current()),
70        ));
71
72        Self {
73            conn: Some(conn),
74            connected: on_connected_recv,
75            handshake_data_ready: Some(on_handshake_data_recv),
76        }
77    }
78
79    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80    ///
81    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82    /// If so, the returned [`Connection`] can be used to send application data without waiting for
83    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85    /// complete, at which point subsequently opened streams and written data will have full
86    /// cryptographic protection.
87    ///
88    /// ## Outgoing
89    ///
90    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96    /// If it was rejected, the existence of streams opened and other application data sent prior
97    /// to the handshake completing will not be conveyed to the remote application, and local
98    /// operations on them will return `ZeroRttRejected` errors.
99    ///
100    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101    /// relevant resumption state to be stored in the server, which servers may limit or lose for
102    /// various reasons including not persisting resumption state across server restarts.
103    ///
104    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105    /// implementation's docs for 0-RTT pitfalls.
106    ///
107    /// ## Incoming
108    ///
109    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111    ///
112    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Security
116    ///
117    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118    /// replay attacks, and should therefore never invoke non-idempotent operations.
119    ///
120    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121    /// before TLS client authentication has occurred, and should therefore not be used to send
122    /// data for which client authentication is being used.
123    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125        // have to release it explicitly before returning `self` by value.
126        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129        drop(conn);
130
131        if is_ok {
132            match self.conn.take() {
133                Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
134                None => {
135                    tracing::error!("Connection state missing during 0-RTT acceptance");
136                    Err(self)
137                }
138            }
139        } else {
140            Err(self)
141        }
142    }
143
144    /// Parameters negotiated during the handshake
145    ///
146    /// The dynamic type returned is determined by the configured
147    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
148    /// be [`downcast`](Box::downcast) to a
149    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
150    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
151        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
152        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
153        // simple.
154        if let Some(x) = self.handshake_data_ready.take() {
155            let _ = x.await;
156        }
157        let conn = self.conn.as_ref().ok_or_else(|| {
158            tracing::error!("Connection state missing while retrieving handshake data");
159            ConnectionError::LocallyClosed
160        })?;
161        let inner = conn.state.lock("handshake");
162        inner
163            .inner
164            .crypto_session()
165            .handshake_data()
166            .ok_or_else(|| {
167                inner
168                    .error
169                    .clone()
170                    .expect("spurious handshake data ready notification")
171            })
172    }
173
174    /// The local IP address which was used when the peer established
175    /// the connection
176    ///
177    /// This can be different from the address the endpoint is bound to, in case
178    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
179    ///
180    /// This will return `None` for clients, or when the platform does not expose this
181    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
182    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
183    ///
184    /// Will panic if called after `poll` has returned `Ready`.
185    pub fn local_ip(&self) -> Option<IpAddr> {
186        let conn = self.conn.as_ref().unwrap();
187        let inner = conn.state.lock("local_ip");
188
189        inner.inner.local_ip()
190    }
191
192    /// The peer's UDP address
193    ///
194    /// Will panic if called after `poll` has returned `Ready`.
195    pub fn remote_address(&self) -> SocketAddr {
196        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
197        conn_ref.state.lock("remote_address").inner.remote_address()
198    }
199}
200
201impl Future for Connecting {
202    type Output = Result<Connection, ConnectionError>;
203    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
204        Pin::new(&mut self.connected).poll(cx).map(|_| {
205            let conn = self.conn.take().unwrap();
206            let inner = conn.state.lock("connecting");
207            if inner.connected {
208                drop(inner);
209                Ok(Connection(conn))
210            } else {
211                Err(inner
212                    .error
213                    .clone()
214                    .expect("connected signaled without connection success or error"))
215            }
216        })
217    }
218}
219
220/// Future that completes when a connection is fully established
221///
222/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
223/// value is meaningless.
224pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
225
226impl Future for ZeroRttAccepted {
227    type Output = bool;
228    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
229        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
230    }
231}
232
233/// A future that drives protocol logic for a connection
234///
235/// This future handles the protocol logic for a single connection, routing events from the
236/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
237/// It also keeps track of outstanding timeouts for the `Connection`.
238///
239/// If the connection encounters an error condition, this future will yield an error. It will
240/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
241/// connection-related futures, this waits for the draining period to complete to ensure that
242/// packets still in flight from the peer are handled gracefully.
243#[must_use = "connection drivers must be spawned for their connections to function"]
244#[derive(Debug)]
245struct ConnectionDriver(ConnectionRef);
246
247impl Future for ConnectionDriver {
248    type Output = Result<(), io::Error>;
249
250    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
251        let conn = &mut *self.0.state.lock("poll");
252
253        let span = debug_span!("drive", id = conn.handle.0);
254        let _guard = span.enter();
255
256        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
257            conn.terminate(e, &self.0.shared);
258            return Poll::Ready(Ok(()));
259        }
260        let mut keep_going = conn.drive_transmit(cx)?;
261        // If a timer expires, there might be more to transmit. When we transmit something, we
262        // might need to reset a timer. Hence, we must loop until neither happens.
263        keep_going |= conn.drive_timer(cx);
264        conn.forward_endpoint_events();
265        conn.forward_app_events(&self.0.shared);
266
267        if !conn.inner.is_drained() {
268            if keep_going {
269                // If the connection hasn't processed all tasks, schedule it again
270                cx.waker().wake_by_ref();
271            } else {
272                conn.driver = Some(cx.waker().clone());
273            }
274            return Poll::Pending;
275        }
276        if conn.error.is_none() {
277            unreachable!("drained connections always have an error");
278        }
279        Poll::Ready(Ok(()))
280    }
281}
282
283/// A QUIC connection.
284///
285/// If all references to a connection (including every clone of the `Connection` handle, streams of
286/// incoming streams, and the various stream types) have been dropped, then the connection will be
287/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
288/// connection explicitly by calling [`Connection::close()`].
289///
290/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
291/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
292/// application. [`Connection::close()`] describes in more detail how to gracefully close a
293/// connection without losing application data.
294///
295/// May be cloned to obtain another handle to the same connection.
296///
297/// [`Connection::close()`]: Connection::close
298#[derive(Debug, Clone)]
299pub struct Connection(ConnectionRef);
300
301impl Connection {
302    /// Initiate a new outgoing unidirectional stream.
303    ///
304    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
305    /// consequence, the peer won't be notified that a stream has been opened until the stream is
306    /// actually used.
307    pub fn open_uni(&self) -> OpenUni<'_> {
308        OpenUni {
309            conn: &self.0,
310            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
311        }
312    }
313
314    /// Initiate a new outgoing bidirectional stream.
315    ///
316    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
317    /// consequence, the peer won't be notified that a stream has been opened until the stream is
318    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
319    /// anything to [`SendStream`] will never succeed.
320    ///
321    /// [`open_bi()`]: crate::Connection::open_bi
322    /// [`SendStream`]: crate::SendStream
323    /// [`RecvStream`]: crate::RecvStream
324    pub fn open_bi(&self) -> OpenBi<'_> {
325        OpenBi {
326            conn: &self.0,
327            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
328        }
329    }
330
331    /// Accept the next incoming uni-directional stream
332    pub fn accept_uni(&self) -> AcceptUni<'_> {
333        AcceptUni {
334            conn: &self.0,
335            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
336        }
337    }
338
339    /// Accept the next incoming bidirectional stream
340    ///
341    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
342    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
343    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
344    ///
345    /// [`accept_bi()`]: crate::Connection::accept_bi
346    /// [`open_bi()`]: crate::Connection::open_bi
347    /// [`SendStream`]: crate::SendStream
348    /// [`RecvStream`]: crate::RecvStream
349    pub fn accept_bi(&self) -> AcceptBi<'_> {
350        AcceptBi {
351            conn: &self.0,
352            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
353        }
354    }
355
356    /// Receive an application datagram
357    pub fn read_datagram(&self) -> ReadDatagram<'_> {
358        ReadDatagram {
359            conn: &self.0,
360            notify: self.0.shared.datagram_received.notified(),
361        }
362    }
363
364    /// Wait for the connection to be closed for any reason
365    ///
366    /// Despite the return type's name, closed connections are often not an error condition at the
367    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
368    /// and [`ConnectionError::ApplicationClosed`].
369    pub async fn closed(&self) -> ConnectionError {
370        {
371            let conn = self.0.state.lock("closed");
372            if let Some(error) = conn.error.as_ref() {
373                return error.clone();
374            }
375            // Construct the future while the lock is held to ensure we can't miss a wakeup if
376            // the `Notify` is signaled immediately after we release the lock. `await` it after
377            // the lock guard is out of scope.
378            self.0.shared.closed.notified()
379        }
380        .await;
381        self.0
382            .state
383            .lock("closed")
384            .error
385            .as_ref()
386            .expect("closed without an error")
387            .clone()
388    }
389
390    /// If the connection is closed, the reason why.
391    ///
392    /// Returns `None` if the connection is still open.
393    pub fn close_reason(&self) -> Option<ConnectionError> {
394        self.0.state.lock("close_reason").error.clone()
395    }
396
397    /// Close the connection immediately.
398    ///
399    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
400    /// more data is sent to the peer and the peer may drop buffered data upon receiving
401    /// the CONNECTION_CLOSE frame.
402    ///
403    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
404    ///
405    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
406    /// is preserved in full, it should be kept under 1KiB.
407    ///
408    /// # Gracefully closing a connection
409    ///
410    /// Only the peer last receiving application data can be certain that all data is
411    /// delivered. The only reliable action it can then take is to close the connection,
412    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
413    /// frame is very likely if both endpoints stay online long enough, and
414    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
415    /// remote peer will time out the connection, provided that the idle timeout is not
416    /// disabled.
417    ///
418    /// The sending side can not guarantee all stream data is delivered to the remote
419    /// application. It only knows the data is delivered to the QUIC stack of the remote
420    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
421    /// [`close()`] the remote endpoint may drop any data it received but is as yet
422    /// undelivered to the application, including data that was acknowledged as received to
423    /// the local endpoint.
424    ///
425    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
426    /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
427    /// [`close()`]: Connection::close
428    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
429        let conn = &mut *self.0.state.lock("close");
430        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
431    }
432
433    /// Transmit `data` as an unreliable, unordered application datagram
434    ///
435    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
436    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
437    /// dictated by the peer.
438    ///
439    /// Previously queued datagrams which are still unsent may be discarded to make space for this
440    /// datagram, in order of oldest to newest.
441    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
442        let conn = &mut *self.0.state.lock("send_datagram");
443        if let Some(ref x) = conn.error {
444            return Err(SendDatagramError::ConnectionLost(x.clone()));
445        }
446        use crate::SendDatagramError::*;
447        match conn.inner.datagrams().send(data, true) {
448            Ok(()) => {
449                conn.wake();
450                Ok(())
451            }
452            Err(e) => Err(match e {
453                Blocked(..) => unreachable!(),
454                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
455                Disabled => SendDatagramError::Disabled,
456                TooLarge => SendDatagramError::TooLarge,
457            }),
458        }
459    }
460
461    /// Transmit `data` as an unreliable, unordered application datagram
462    ///
463    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
464    /// conditions, which effectively prioritizes old datagrams over new datagrams.
465    ///
466    /// See [`send_datagram()`] for details.
467    ///
468    /// [`send_datagram()`]: Connection::send_datagram
469    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
470        SendDatagram {
471            conn: &self.0,
472            data: Some(data),
473            notify: self.0.shared.datagrams_unblocked.notified(),
474        }
475    }
476
477    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
478    ///
479    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
480    ///
481    /// This may change over the lifetime of a connection according to variation in the path MTU
482    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
483    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
484    ///
485    /// Not necessarily the maximum size of received datagrams.
486    ///
487    /// [`send_datagram()`]: Connection::send_datagram
488    pub fn max_datagram_size(&self) -> Option<usize> {
489        self.0
490            .state
491            .lock("max_datagram_size")
492            .inner
493            .datagrams()
494            .max_size()
495    }
496
497    /// Bytes available in the outgoing datagram buffer
498    ///
499    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
500    /// at most this size is guaranteed not to cause older datagrams to be dropped.
501    pub fn datagram_send_buffer_space(&self) -> usize {
502        self.0
503            .state
504            .lock("datagram_send_buffer_space")
505            .inner
506            .datagrams()
507            .send_buffer_space()
508    }
509
510    /// The side of the connection (client or server)
511    pub fn side(&self) -> Side {
512        self.0.state.lock("side").inner.side()
513    }
514
515    /// The peer's UDP address
516    ///
517    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
518    /// switching to a cellular internet connection.
519    pub fn remote_address(&self) -> SocketAddr {
520        self.0.state.lock("remote_address").inner.remote_address()
521    }
522
523    /// The local IP address which was used when the peer established
524    /// the connection
525    ///
526    /// This can be different from the address the endpoint is bound to, in case
527    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
528    ///
529    /// This will return `None` for clients, or when the platform does not expose this
530    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
531    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
532    pub fn local_ip(&self) -> Option<IpAddr> {
533        self.0.state.lock("local_ip").inner.local_ip()
534    }
535
536    /// Current best estimate of this connection's latency (round-trip-time)
537    pub fn rtt(&self) -> Duration {
538        self.0.state.lock("rtt").inner.rtt()
539    }
540
541    /// Returns connection statistics
542    pub fn stats(&self) -> ConnectionStats {
543        self.0.state.lock("stats").inner.stats()
544    }
545
546    /// Current state of the congestion control algorithm, for debugging purposes
547    pub fn congestion_state(&self) -> Box<dyn Controller> {
548        self.0
549            .state
550            .lock("congestion_state")
551            .inner
552            .congestion_state()
553            .clone_box()
554    }
555
556    /// Parameters negotiated during the handshake
557    ///
558    /// Guaranteed to return `Some` on fully established connections or after
559    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
560    /// the returned value.
561    ///
562    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
563    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
564        self.0
565            .state
566            .lock("handshake_data")
567            .inner
568            .crypto_session()
569            .handshake_data()
570    }
571
572    /// Cryptographic identity of the peer
573    ///
574    /// The dynamic type returned is determined by the configured
575    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
576    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
577    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
578        self.0
579            .state
580            .lock("peer_identity")
581            .inner
582            .crypto_session()
583            .peer_identity()
584    }
585
586    /// A stable identifier for this connection
587    ///
588    /// Peer addresses and connection IDs can change, but this value will remain
589    /// fixed for the lifetime of the connection.
590    pub fn stable_id(&self) -> usize {
591        self.0.stable_id()
592    }
593
594    /// Update traffic keys spontaneously
595    ///
596    /// This primarily exists for testing purposes.
597    pub fn force_key_update(&self) {
598        self.0
599            .state
600            .lock("force_key_update")
601            .inner
602            .force_key_update()
603    }
604
605    /// Derive keying material from this connection's TLS session secrets.
606    ///
607    /// When both peers call this method with the same `label` and `context`
608    /// arguments and `output` buffers of equal length, they will get the
609    /// same sequence of bytes in `output`. These bytes are cryptographically
610    /// strong and pseudorandom, and are suitable for use as keying material.
611    ///
612    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
613    pub fn export_keying_material(
614        &self,
615        output: &mut [u8],
616        label: &[u8],
617        context: &[u8],
618    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
619        self.0
620            .state
621            .lock("export_keying_material")
622            .inner
623            .crypto_session()
624            .export_keying_material(output, label, context)
625    }
626
627    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
628    ///
629    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
630    /// `count`s increase both minimum and worst-case memory consumption.
631    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
632        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
633        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
634        // May need to send MAX_STREAMS to make progress
635        conn.wake();
636    }
637
638    /// See [`crate::TransportConfig::receive_window()`]
639    pub fn set_receive_window(&self, receive_window: VarInt) {
640        let mut conn = self.0.state.lock("set_receive_window");
641        conn.inner.set_receive_window(receive_window);
642        conn.wake();
643    }
644
645    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
646    ///
647    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
648    /// `count`s increase both minimum and worst-case memory consumption.
649    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
650        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
651        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
652        // May need to send MAX_STREAMS to make progress
653        conn.wake();
654    }
655
656    /// Set up qlog for this connection.
657    #[cfg(feature = "__qlog")]
658    pub fn set_qlog(
659        &mut self,
660        writer: Box<dyn std::io::Write + Send + Sync>,
661        title: Option<String>,
662        description: Option<String>,
663    ) {
664        let mut state = self.0.state.lock("__qlog");
665        state
666            .inner
667            .set_qlog(writer, title, description, Instant::now());
668    }
669}
670
671pin_project! {
672    /// Future produced by [`Connection::open_uni`]
673    pub struct OpenUni<'a> {
674        conn: &'a ConnectionRef,
675        #[pin]
676        notify: Notified<'a>,
677    }
678}
679
680impl Future for OpenUni<'_> {
681    type Output = Result<SendStream, ConnectionError>;
682    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
683        let this = self.project();
684        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
685        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
686    }
687}
688
689pin_project! {
690    /// Future produced by [`Connection::open_bi`]
691    pub struct OpenBi<'a> {
692        conn: &'a ConnectionRef,
693        #[pin]
694        notify: Notified<'a>,
695    }
696}
697
698impl Future for OpenBi<'_> {
699    type Output = Result<(SendStream, RecvStream), ConnectionError>;
700    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
701        let this = self.project();
702        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
703
704        Poll::Ready(Ok((
705            SendStream::new(conn.clone(), id, is_0rtt),
706            RecvStream::new(conn, id, is_0rtt),
707        )))
708    }
709}
710
711fn poll_open<'a>(
712    ctx: &mut Context<'_>,
713    conn: &'a ConnectionRef,
714    mut notify: Pin<&mut Notified<'a>>,
715    dir: Dir,
716) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
717    let mut state = conn.state.lock("poll_open");
718    if let Some(ref e) = state.error {
719        return Poll::Ready(Err(e.clone()));
720    } else if let Some(id) = state.inner.streams().open(dir) {
721        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
722        drop(state); // Release the lock so clone can take it
723        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
724    }
725    loop {
726        match notify.as_mut().poll(ctx) {
727            // `state` lock ensures we didn't race with readiness
728            Poll::Pending => return Poll::Pending,
729            // Spurious wakeup, get a new future
730            Poll::Ready(()) => {
731                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
732            }
733        }
734    }
735}
736
737pin_project! {
738    /// Future produced by [`Connection::accept_uni`]
739    pub struct AcceptUni<'a> {
740        conn: &'a ConnectionRef,
741        #[pin]
742        notify: Notified<'a>,
743    }
744}
745
746impl Future for AcceptUni<'_> {
747    type Output = Result<RecvStream, ConnectionError>;
748
749    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
750        let this = self.project();
751        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
752        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
753    }
754}
755
756pin_project! {
757    /// Future produced by [`Connection::accept_bi`]
758    pub struct AcceptBi<'a> {
759        conn: &'a ConnectionRef,
760        #[pin]
761        notify: Notified<'a>,
762    }
763}
764
765impl Future for AcceptBi<'_> {
766    type Output = Result<(SendStream, RecvStream), ConnectionError>;
767
768    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
769        let this = self.project();
770        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
771        Poll::Ready(Ok((
772            SendStream::new(conn.clone(), id, is_0rtt),
773            RecvStream::new(conn, id, is_0rtt),
774        )))
775    }
776}
777
778fn poll_accept<'a>(
779    ctx: &mut Context<'_>,
780    conn: &'a ConnectionRef,
781    mut notify: Pin<&mut Notified<'a>>,
782    dir: Dir,
783) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
784    let mut state = conn.state.lock("poll_accept");
785    // Check for incoming streams before checking `state.error` so that already-received streams,
786    // which are necessarily finite, can be drained from a closed connection.
787    if let Some(id) = state.inner.streams().accept(dir) {
788        let is_0rtt = state.inner.is_handshaking();
789        state.wake(); // To send additional stream ID credit
790        drop(state); // Release the lock so clone can take it
791        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
792    } else if let Some(ref e) = state.error {
793        return Poll::Ready(Err(e.clone()));
794    }
795    loop {
796        match notify.as_mut().poll(ctx) {
797            // `state` lock ensures we didn't race with readiness
798            Poll::Pending => return Poll::Pending,
799            // Spurious wakeup, get a new future
800            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
801        }
802    }
803}
804
805pin_project! {
806    /// Future produced by [`Connection::read_datagram`]
807    pub struct ReadDatagram<'a> {
808        conn: &'a ConnectionRef,
809        #[pin]
810        notify: Notified<'a>,
811    }
812}
813
814impl Future for ReadDatagram<'_> {
815    type Output = Result<Bytes, ConnectionError>;
816    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
817        let mut this = self.project();
818        let mut state = this.conn.state.lock("ReadDatagram::poll");
819        // Check for buffered datagrams before checking `state.error` so that already-received
820        // datagrams, which are necessarily finite, can be drained from a closed connection.
821        match state.inner.datagrams().recv() {
822            Some(x) => {
823                return Poll::Ready(Ok(x));
824            }
825            _ => {
826                if let Some(ref e) = state.error {
827                    return Poll::Ready(Err(e.clone()));
828                }
829            }
830        }
831        loop {
832            match this.notify.as_mut().poll(ctx) {
833                // `state` lock ensures we didn't race with readiness
834                Poll::Pending => return Poll::Pending,
835                // Spurious wakeup, get a new future
836                Poll::Ready(()) => this
837                    .notify
838                    .set(this.conn.shared.datagram_received.notified()),
839            }
840        }
841    }
842}
843
844pin_project! {
845    /// Future produced by [`Connection::send_datagram_wait`]
846    pub struct SendDatagram<'a> {
847        conn: &'a ConnectionRef,
848        data: Option<Bytes>,
849        #[pin]
850        notify: Notified<'a>,
851    }
852}
853
854impl Future for SendDatagram<'_> {
855    type Output = Result<(), SendDatagramError>;
856    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
857        let mut this = self.project();
858        let mut state = this.conn.state.lock("SendDatagram::poll");
859        if let Some(ref e) = state.error {
860            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
861        }
862        use crate::SendDatagramError::*;
863        match state
864            .inner
865            .datagrams()
866            .send(this.data.take().unwrap(), false)
867        {
868            Ok(()) => {
869                state.wake();
870                Poll::Ready(Ok(()))
871            }
872            Err(e) => Poll::Ready(Err(match e {
873                Blocked(data) => {
874                    this.data.replace(data);
875                    loop {
876                        match this.notify.as_mut().poll(ctx) {
877                            Poll::Pending => return Poll::Pending,
878                            // Spurious wakeup, get a new future
879                            Poll::Ready(()) => this
880                                .notify
881                                .set(this.conn.shared.datagrams_unblocked.notified()),
882                        }
883                    }
884                }
885                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
886                Disabled => SendDatagramError::Disabled,
887                TooLarge => SendDatagramError::TooLarge,
888            })),
889        }
890    }
891}
892
893#[derive(Debug)]
894pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
895
896impl ConnectionRef {
897    #[allow(clippy::too_many_arguments)]
898    fn new(
899        handle: ConnectionHandle,
900        conn: crate::Connection,
901        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
902        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
903        on_handshake_data: oneshot::Sender<()>,
904        on_connected: oneshot::Sender<bool>,
905        socket: Arc<dyn AsyncUdpSocket>,
906        runtime: Arc<dyn Runtime>,
907    ) -> Self {
908        Self(Arc::new(ConnectionInner {
909            state: Mutex::new(State {
910                inner: conn,
911                driver: None,
912                handle,
913                on_handshake_data: Some(on_handshake_data),
914                on_connected: Some(on_connected),
915                connected: false,
916                timer: None,
917                timer_deadline: None,
918                conn_events,
919                endpoint_events,
920                blocked_writers: FxHashMap::default(),
921                blocked_readers: FxHashMap::default(),
922                stopped: FxHashMap::default(),
923                error: None,
924                ref_count: 0,
925                io_poller: socket.clone().create_io_poller(),
926                socket,
927                runtime,
928                send_buffer: Vec::new(),
929                buffered_transmit: None,
930            }),
931            shared: Shared::default(),
932        }))
933    }
934
935    fn stable_id(&self) -> usize {
936        &*self.0 as *const _ as usize
937    }
938}
939
940impl Clone for ConnectionRef {
941    fn clone(&self) -> Self {
942        self.state.lock("clone").ref_count += 1;
943        Self(self.0.clone())
944    }
945}
946
947impl Drop for ConnectionRef {
948    fn drop(&mut self) {
949        let conn = &mut *self.state.lock("drop");
950        if let Some(x) = conn.ref_count.checked_sub(1) {
951            conn.ref_count = x;
952            if x == 0 && !conn.inner.is_closed() {
953                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
954                // not, we can't do any harm. If there were any streams being opened, then either
955                // the connection will be closed for an unrelated reason or a fresh reference will
956                // be constructed for the newly opened stream.
957                conn.implicit_close(&self.shared);
958            }
959        }
960    }
961}
962
963impl std::ops::Deref for ConnectionRef {
964    type Target = ConnectionInner;
965    fn deref(&self) -> &Self::Target {
966        &self.0
967    }
968}
969
970#[derive(Debug)]
971pub(crate) struct ConnectionInner {
972    pub(crate) state: Mutex<State>,
973    pub(crate) shared: Shared,
974}
975
976#[derive(Debug, Default)]
977pub(crate) struct Shared {
978    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
979    /// control budget
980    stream_budget_available: [Notify; 2],
981    /// Notified when the peer has initiated a new stream
982    stream_incoming: [Notify; 2],
983    datagram_received: Notify,
984    datagrams_unblocked: Notify,
985    closed: Notify,
986}
987
988pub(crate) struct State {
989    pub(crate) inner: crate::Connection,
990    driver: Option<Waker>,
991    handle: ConnectionHandle,
992    on_handshake_data: Option<oneshot::Sender<()>>,
993    on_connected: Option<oneshot::Sender<bool>>,
994    connected: bool,
995    timer: Option<Pin<Box<dyn AsyncTimer>>>,
996    timer_deadline: Option<Instant>,
997    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
998    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
999    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1000    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1001    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1002    /// Always set to Some before the connection becomes drained
1003    pub(crate) error: Option<ConnectionError>,
1004    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1005    ref_count: usize,
1006    socket: Arc<dyn AsyncUdpSocket>,
1007    io_poller: Pin<Box<dyn UdpPoller>>,
1008    runtime: Arc<dyn Runtime>,
1009    send_buffer: Vec<u8>,
1010    /// We buffer a transmit when the underlying I/O would block
1011    buffered_transmit: Option<crate::Transmit>,
1012}
1013
1014impl State {
1015    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1016        let now = self.runtime.now();
1017        let mut transmits = 0;
1018
1019        let max_datagrams = self
1020            .socket
1021            .max_transmit_segments()
1022            .min(MAX_TRANSMIT_SEGMENTS);
1023
1024        loop {
1025            // Retry the last transmit, or get a new one.
1026            let t = match self.buffered_transmit.take() {
1027                Some(t) => t,
1028                None => {
1029                    self.send_buffer.clear();
1030                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1031                    match self
1032                        .inner
1033                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1034                    {
1035                        Some(t) => {
1036                            transmits += match t.segment_size {
1037                                None => 1,
1038                                Some(s) => t.size.div_ceil(s), // round up
1039                            };
1040                            t
1041                        }
1042                        None => break,
1043                    }
1044                }
1045            };
1046
1047            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1048                // Retry after a future wakeup
1049                self.buffered_transmit = Some(t);
1050                return Ok(false);
1051            }
1052
1053            let len = t.size;
1054            let retry = match self
1055                .socket
1056                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1057            {
1058                Ok(()) => false,
1059                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1060                Err(e) => return Err(e),
1061            };
1062            if retry {
1063                // We thought the socket was writable, but it wasn't. Retry so that either another
1064                // `poll_writable` call determines that the socket is indeed not writable and
1065                // registers us for a wakeup, or the send succeeds if this really was just a
1066                // transient failure.
1067                self.buffered_transmit = Some(t);
1068                continue;
1069            }
1070
1071            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1072                // TODO: What isn't ideal here yet is that if we don't poll all
1073                // datagrams that could be sent we don't go into the `app_limited`
1074                // state and CWND continues to grow until we get here the next time.
1075                // See https://github.com/quinn-rs/quinn/issues/1126
1076                return Ok(true);
1077            }
1078        }
1079
1080        Ok(false)
1081    }
1082
1083    fn forward_endpoint_events(&mut self) {
1084        while let Some(event) = self.inner.poll_endpoint_events() {
1085            // If the endpoint driver is gone, noop.
1086            let _ = self.endpoint_events.send((self.handle, event));
1087        }
1088    }
1089
1090    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1091    fn process_conn_events(
1092        &mut self,
1093        shared: &Shared,
1094        cx: &mut Context,
1095    ) -> Result<(), ConnectionError> {
1096        loop {
1097            match self.conn_events.poll_recv(cx) {
1098                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1099                    self.socket = socket;
1100                    self.io_poller = self.socket.clone().create_io_poller();
1101                    self.inner.local_address_changed();
1102                }
1103                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1104                    self.inner.handle_event(event);
1105                }
1106                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1107                    self.close(error_code, reason, shared);
1108                }
1109                Poll::Ready(None) => {
1110                    return Err(ConnectionError::TransportError(crate::TransportError {
1111                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1112                        frame: None,
1113                        reason: "endpoint driver future was dropped".to_string(),
1114                    }));
1115                }
1116                Poll::Pending => {
1117                    return Ok(());
1118                }
1119            }
1120        }
1121    }
1122
1123    fn forward_app_events(&mut self, shared: &Shared) {
1124        while let Some(event) = self.inner.poll() {
1125            use crate::Event::*;
1126            match event {
1127                HandshakeDataReady => {
1128                    if let Some(x) = self.on_handshake_data.take() {
1129                        let _ = x.send(());
1130                    }
1131                }
1132                Connected => {
1133                    self.connected = true;
1134                    if let Some(x) = self.on_connected.take() {
1135                        // We don't care if the on-connected future was dropped
1136                        let _ = x.send(self.inner.accepted_0rtt());
1137                    }
1138                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1139                        // Wake up rejected 0-RTT streams so they can fail immediately with
1140                        // `ZeroRttRejected` errors.
1141                        wake_all(&mut self.blocked_writers);
1142                        wake_all(&mut self.blocked_readers);
1143                        wake_all_notify(&mut self.stopped);
1144                    }
1145                }
1146                ConnectionLost { reason } => {
1147                    self.terminate(reason, shared);
1148                }
1149                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1150                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1151                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1152                }
1153                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1154                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1155                }
1156                DatagramReceived => {
1157                    shared.datagram_received.notify_waiters();
1158                }
1159                DatagramsUnblocked => {
1160                    shared.datagrams_unblocked.notify_waiters();
1161                }
1162                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1163                Stream(StreamEvent::Available { dir }) => {
1164                    // Might mean any number of streams are ready, so we wake up everyone
1165                    shared.stream_budget_available[dir as usize].notify_waiters();
1166                }
1167                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1168                Stream(StreamEvent::Stopped { id, .. }) => {
1169                    wake_stream_notify(id, &mut self.stopped);
1170                    wake_stream(id, &mut self.blocked_writers);
1171                }
1172            }
1173        }
1174    }
1175
1176    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1177        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1178        // timer is registered with the runtime (and check whether it's already
1179        // expired).
1180        match self.inner.poll_timeout() {
1181            Some(deadline) => {
1182                if let Some(delay) = &mut self.timer {
1183                    // There is no need to reset the tokio timer if the deadline
1184                    // did not change
1185                    if self
1186                        .timer_deadline
1187                        .map(|current_deadline| current_deadline != deadline)
1188                        .unwrap_or(true)
1189                    {
1190                        delay.as_mut().reset(deadline);
1191                    }
1192                } else {
1193                    self.timer = Some(self.runtime.new_timer(deadline));
1194                }
1195                // Store the actual expiration time of the timer
1196                self.timer_deadline = Some(deadline);
1197            }
1198            None => {
1199                self.timer_deadline = None;
1200                return false;
1201            }
1202        }
1203
1204        if self.timer_deadline.is_none() {
1205            return false;
1206        }
1207
1208        let delay = self
1209            .timer
1210            .as_mut()
1211            .expect("timer must exist in this state")
1212            .as_mut();
1213        if delay.poll(cx).is_pending() {
1214            // Since there wasn't a timeout event, there is nothing new
1215            // for the connection to do
1216            return false;
1217        }
1218
1219        // A timer expired, so the caller needs to check for
1220        // new transmits, which might cause new timers to be set.
1221        self.inner.handle_timeout(self.runtime.now());
1222        self.timer_deadline = None;
1223        true
1224    }
1225
1226    /// Wake up a blocked `Driver` task to process I/O
1227    pub(crate) fn wake(&mut self) {
1228        if let Some(x) = self.driver.take() {
1229            x.wake();
1230        }
1231    }
1232
1233    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1234    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1235        self.error = Some(reason.clone());
1236        if let Some(x) = self.on_handshake_data.take() {
1237            let _ = x.send(());
1238        }
1239        wake_all(&mut self.blocked_writers);
1240        wake_all(&mut self.blocked_readers);
1241        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1242        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1243        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1244        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1245        shared.datagram_received.notify_waiters();
1246        shared.datagrams_unblocked.notify_waiters();
1247        if let Some(x) = self.on_connected.take() {
1248            let _ = x.send(false);
1249        }
1250        wake_all_notify(&mut self.stopped);
1251        shared.closed.notify_waiters();
1252    }
1253
1254    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1255        self.inner.close(self.runtime.now(), error_code, reason);
1256        self.terminate(ConnectionError::LocallyClosed, shared);
1257        self.wake();
1258    }
1259
1260    /// Close for a reason other than the application's explicit request
1261    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1262        self.close(0u32.into(), Bytes::new(), shared);
1263    }
1264
1265    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1266        if self.inner.is_handshaking()
1267            || self.inner.accepted_0rtt()
1268            || self.inner.side().is_server()
1269        {
1270            Ok(())
1271        } else {
1272            Err(())
1273        }
1274    }
1275}
1276
1277impl Drop for State {
1278    fn drop(&mut self) {
1279        if !self.inner.is_drained() {
1280            // Ensure the endpoint can tidy up
1281            let _ = self
1282                .endpoint_events
1283                .send((self.handle, crate::EndpointEvent::drained()));
1284        }
1285    }
1286}
1287
1288impl fmt::Debug for State {
1289    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1290        f.debug_struct("State").field("inner", &self.inner).finish()
1291    }
1292}
1293
1294fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1295    if let Some(waker) = wakers.remove(&stream_id) {
1296        waker.wake();
1297    }
1298}
1299
1300fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1301    wakers.drain().for_each(|(_, waker)| waker.wake())
1302}
1303
1304fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1305    if let Some(notify) = wakers.remove(&stream_id) {
1306        notify.notify_waiters()
1307    }
1308}
1309
1310fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1311    wakers
1312        .drain()
1313        .for_each(|(_, notify)| notify.notify_waiters())
1314}
1315
1316/// Errors that can arise when sending a datagram
1317#[derive(Debug, Error, Clone, Eq, PartialEq)]
1318pub enum SendDatagramError {
1319    /// The peer does not support receiving datagram frames
1320    #[error("datagrams not supported by peer")]
1321    UnsupportedByPeer,
1322    /// Datagram support is disabled locally
1323    #[error("datagram support disabled")]
1324    Disabled,
1325    /// The datagram is larger than the connection can currently accommodate
1326    ///
1327    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1328    /// exceeded.
1329    #[error("datagram too large")]
1330    TooLarge,
1331    /// The connection was lost
1332    #[error("connection lost")]
1333    ConnectionLost(#[from] ConnectionError),
1334}
1335
1336/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1337///
1338/// This limits the amount of CPU resources consumed by datagram generation,
1339/// and allows other tasks (like receiving ACKs) to run in between.
1340const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1341
1342/// The maximum amount of datagrams that are sent in a single transmit
1343///
1344/// This can be lower than the maximum platform capabilities, to avoid excessive
1345/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1346/// that numbers around 10 are a good compromise.
1347const MAX_TRANSMIT_SEGMENTS: usize = 10;