iroh_quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::{Arc, Weak},
9    task::{Context, Poll, Waker},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{futures::Notified, mpsc, oneshot, watch, Notify};
17use tracing::{debug_span, Instrument, Span};
18
19use crate::{
20    mutex::Mutex,
21    recv_stream::RecvStream,
22    runtime::{AsyncTimer, Runtime, UdpSender},
23    send_stream::SendStream,
24    udp_transmit, ConnectionEvent, Duration, Instant, VarInt,
25};
26use proto::{
27    congestion::Controller, ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent,
28    StreamEvent, StreamId,
29};
30
31/// In-progress connection attempt future
32#[derive(Debug)]
33pub struct Connecting {
34    conn: Option<ConnectionRef>,
35    connected: oneshot::Receiver<bool>,
36    handshake_data_ready: Option<oneshot::Receiver<()>>,
37}
38
39impl Connecting {
40    pub(crate) fn new(
41        handle: ConnectionHandle,
42        conn: proto::Connection,
43        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
44        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
45        sender: Pin<Box<dyn UdpSender>>,
46        runtime: Arc<dyn Runtime>,
47    ) -> Self {
48        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
49        let (on_connected_send, on_connected_recv) = oneshot::channel();
50        let conn = ConnectionRef::new(
51            handle,
52            conn,
53            endpoint_events,
54            conn_events,
55            on_handshake_data_send,
56            on_connected_send,
57            sender,
58            runtime.clone(),
59        );
60
61        let driver = ConnectionDriver(conn.clone());
62        runtime.spawn(Box::pin(
63            async {
64                if let Err(e) = driver.await {
65                    tracing::error!("I/O error: {e}");
66                }
67            }
68            .instrument(Span::current()),
69        ));
70
71        Self {
72            conn: Some(conn),
73            connected: on_connected_recv,
74            handshake_data_ready: Some(on_handshake_data_recv),
75        }
76    }
77
78    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
79    ///
80    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
81    /// If so, the returned [`Connection`] can be used to send application data without waiting for
82    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
83    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
84    /// complete, at which point subsequently opened streams and written data will have full
85    /// cryptographic protection.
86    ///
87    /// ## Outgoing
88    ///
89    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
90    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
91    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
92    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
93    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
94    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
95    /// If it was rejected, the existence of streams opened and other application data sent prior
96    /// to the handshake completing will not be conveyed to the remote application, and local
97    /// operations on them will return `ZeroRttRejected` errors.
98    ///
99    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
100    /// relevant resumption state to be stored in the server, which servers may limit or lose for
101    /// various reasons including not persisting resumption state across server restarts.
102    ///
103    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
104    /// implementation's docs for 0-RTT pitfalls.
105    ///
106    /// ## Incoming
107    ///
108    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
109    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
110    ///
111    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
112    /// implementation's docs for 0-RTT pitfalls.
113    ///
114    /// ## Security
115    ///
116    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
117    /// replay attacks, and should therefore never invoke non-idempotent operations.
118    ///
119    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
120    /// before TLS client authentication has occurred, and should therefore not be used to send
121    /// data for which client authentication is being used.
122    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
123        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
124        // have to release it explicitly before returning `self` by value.
125        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
126
127        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
128        drop(conn);
129
130        if is_ok {
131            let conn = self.conn.take().unwrap();
132            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
133        } else {
134            Err(self)
135        }
136    }
137
138    /// Parameters negotiated during the handshake
139    ///
140    /// The dynamic type returned is determined by the configured
141    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
142    /// be [`downcast`](Box::downcast) to a
143    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
144    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
145        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
146        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
147        // simple.
148        if let Some(x) = self.handshake_data_ready.take() {
149            let _ = x.await;
150        }
151        let conn = self.conn.as_ref().unwrap();
152        let inner = conn.state.lock("handshake");
153        inner
154            .inner
155            .crypto_session()
156            .handshake_data()
157            .ok_or_else(|| {
158                inner
159                    .error
160                    .clone()
161                    .expect("spurious handshake data ready notification")
162            })
163    }
164
165    /// The local IP address which was used when the peer established
166    /// the connection
167    ///
168    /// This can be different from the address the endpoint is bound to, in case
169    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
170    ///
171    /// This will return `None` for clients, or when the platform does not expose this
172    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
173    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
174    ///
175    /// Will panic if called after `poll` has returned `Ready`.
176    pub fn local_ip(&self) -> Option<IpAddr> {
177        let conn = self.conn.as_ref().unwrap();
178        let inner = conn.state.lock("local_ip");
179
180        inner.inner.local_ip()
181    }
182
183    /// The peer's UDP address
184    ///
185    /// Will panic if called after `poll` has returned `Ready`.
186    pub fn remote_address(&self) -> SocketAddr {
187        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
188        conn_ref.state.lock("remote_address").inner.remote_address()
189    }
190}
191
192impl Future for Connecting {
193    type Output = Result<Connection, ConnectionError>;
194    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
195        Pin::new(&mut self.connected).poll(cx).map(|_| {
196            let conn = self.conn.take().unwrap();
197            let inner = conn.state.lock("connecting");
198            if inner.connected {
199                drop(inner);
200                Ok(Connection(conn))
201            } else {
202                Err(inner
203                    .error
204                    .clone()
205                    .expect("connected signaled without connection success or error"))
206            }
207        })
208    }
209}
210
211/// Future that completes when a connection is fully established
212///
213/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
214/// value is meaningless.
215pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
216
217impl Future for ZeroRttAccepted {
218    type Output = bool;
219    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
220        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
221    }
222}
223
224/// A future that drives protocol logic for a connection
225///
226/// This future handles the protocol logic for a single connection, routing events from the
227/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
228/// It also keeps track of outstanding timeouts for the `Connection`.
229///
230/// If the connection encounters an error condition, this future will yield an error. It will
231/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
232/// connection-related futures, this waits for the draining period to complete to ensure that
233/// packets still in flight from the peer are handled gracefully.
234#[must_use = "connection drivers must be spawned for their connections to function"]
235#[derive(Debug)]
236struct ConnectionDriver(ConnectionRef);
237
238impl Future for ConnectionDriver {
239    type Output = Result<(), io::Error>;
240
241    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
242        let conn = &mut *self.0.state.lock("poll");
243
244        let span = debug_span!("drive", id = conn.handle.0);
245        let _guard = span.enter();
246
247        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
248            conn.terminate(e, &self.0.shared);
249            return Poll::Ready(Ok(()));
250        }
251        let mut keep_going = conn.drive_transmit(cx)?;
252        // If a timer expires, there might be more to transmit. When we transmit something, we
253        // might need to reset a timer. Hence, we must loop until neither happens.
254        keep_going |= conn.drive_timer(cx);
255        conn.forward_endpoint_events();
256        conn.forward_app_events(&self.0.shared);
257
258        if !conn.inner.is_drained() {
259            if keep_going {
260                // If the connection hasn't processed all tasks, schedule it again
261                cx.waker().wake_by_ref();
262            } else {
263                conn.driver = Some(cx.waker().clone());
264            }
265            return Poll::Pending;
266        }
267        if conn.error.is_none() {
268            unreachable!("drained connections always have an error");
269        }
270        Poll::Ready(Ok(()))
271    }
272}
273
274/// A QUIC connection.
275///
276/// If all references to a connection (including every clone of the `Connection` handle, streams of
277/// incoming streams, and the various stream types) have been dropped, then the connection will be
278/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
279/// connection explicitly by calling [`Connection::close()`].
280///
281/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
282/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
283/// application. [`Connection::close()`] describes in more detail how to gracefully close a
284/// connection without losing application data.
285///
286/// May be cloned to obtain another handle to the same connection.
287///
288/// [`Connection::close()`]: Connection::close
289#[derive(Debug, Clone)]
290pub struct Connection(ConnectionRef);
291
292impl Connection {
293    /// Returns a weak reference to the inner connection struct.
294    pub fn weak_handle(&self) -> WeakConnectionHandle {
295        WeakConnectionHandle(Arc::downgrade(&self.0 .0))
296    }
297
298    /// Initiate a new outgoing unidirectional stream.
299    ///
300    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
301    /// consequence, the peer won't be notified that a stream has been opened until the stream is
302    /// actually used.
303    pub fn open_uni(&self) -> OpenUni<'_> {
304        OpenUni {
305            conn: &self.0,
306            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
307        }
308    }
309
310    /// Initiate a new outgoing bidirectional stream.
311    ///
312    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
313    /// consequence, the peer won't be notified that a stream has been opened until the stream is
314    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
315    /// anything to [`SendStream`] will never succeed.
316    ///
317    /// [`open_bi()`]: crate::Connection::open_bi
318    /// [`SendStream`]: crate::SendStream
319    /// [`RecvStream`]: crate::RecvStream
320    pub fn open_bi(&self) -> OpenBi<'_> {
321        OpenBi {
322            conn: &self.0,
323            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
324        }
325    }
326
327    /// Accept the next incoming uni-directional stream
328    pub fn accept_uni(&self) -> AcceptUni<'_> {
329        AcceptUni {
330            conn: &self.0,
331            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
332        }
333    }
334
335    /// Accept the next incoming bidirectional stream
336    ///
337    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
338    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
339    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
340    ///
341    /// [`accept_bi()`]: crate::Connection::accept_bi
342    /// [`open_bi()`]: crate::Connection::open_bi
343    /// [`SendStream`]: crate::SendStream
344    /// [`RecvStream`]: crate::RecvStream
345    pub fn accept_bi(&self) -> AcceptBi<'_> {
346        AcceptBi {
347            conn: &self.0,
348            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
349        }
350    }
351
352    /// Receive an application datagram
353    pub fn read_datagram(&self) -> ReadDatagram<'_> {
354        ReadDatagram {
355            conn: &self.0,
356            notify: self.0.shared.datagram_received.notified(),
357        }
358    }
359
360    /// Wait for the connection to be closed for any reason
361    ///
362    /// Despite the return type's name, closed connections are often not an error condition at the
363    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
364    /// and [`ConnectionError::ApplicationClosed`].
365    pub async fn closed(&self) -> ConnectionError {
366        {
367            let conn = self.0.state.lock("closed");
368            if let Some(error) = conn.error.as_ref() {
369                return error.clone();
370            }
371            // Construct the future while the lock is held to ensure we can't miss a wakeup if
372            // the `Notify` is signaled immediately after we release the lock. `await` it after
373            // the lock guard is out of scope.
374            self.0.shared.closed.notified()
375        }
376        .await;
377        self.0
378            .state
379            .lock("closed")
380            .error
381            .as_ref()
382            .expect("closed without an error")
383            .clone()
384    }
385
386    /// If the connection is closed, the reason why.
387    ///
388    /// Returns `None` if the connection is still open.
389    pub fn close_reason(&self) -> Option<ConnectionError> {
390        self.0.state.lock("close_reason").error.clone()
391    }
392
393    /// Close the connection immediately.
394    ///
395    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
396    /// more data is sent to the peer and the peer may drop buffered data upon receiving
397    /// the CONNECTION_CLOSE frame.
398    ///
399    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
400    ///
401    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
402    /// is preserved in full, it should be kept under 1KiB.
403    ///
404    /// # Gracefully closing a connection
405    ///
406    /// Only the peer last receiving application data can be certain that all data is
407    /// delivered. The only reliable action it can then take is to close the connection,
408    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
409    /// frame is very likely if both endpoints stay online long enough, and
410    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
411    /// remote peer will time out the connection, provided that the idle timeout is not
412    /// disabled.
413    ///
414    /// The sending side can not guarantee all stream data is delivered to the remote
415    /// application. It only knows the data is delivered to the QUIC stack of the remote
416    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
417    /// [`close()`] the remote endpoint may drop any data it received but is as yet
418    /// undelivered to the application, including data that was acknowledged as received to
419    /// the local endpoint.
420    ///
421    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
422    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
423    /// [`close()`]: Connection::close
424    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
425        let conn = &mut *self.0.state.lock("close");
426        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
427    }
428
429    /// Transmit `data` as an unreliable, unordered application datagram
430    ///
431    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
432    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
433    /// dictated by the peer.
434    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
435        let conn = &mut *self.0.state.lock("send_datagram");
436        if let Some(ref x) = conn.error {
437            return Err(SendDatagramError::ConnectionLost(x.clone()));
438        }
439        use proto::SendDatagramError::*;
440        match conn.inner.datagrams().send(data, true) {
441            Ok(()) => {
442                conn.wake();
443                Ok(())
444            }
445            Err(e) => Err(match e {
446                Blocked(..) => unreachable!(),
447                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
448                Disabled => SendDatagramError::Disabled,
449                TooLarge => SendDatagramError::TooLarge,
450            }),
451        }
452    }
453
454    /// Transmit `data` as an unreliable, unordered application datagram
455    ///
456    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
457    /// conditions, which effectively prioritizes old datagrams over new datagrams.
458    ///
459    /// See [`send_datagram()`] for details.
460    ///
461    /// [`send_datagram()`]: Connection::send_datagram
462    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
463        SendDatagram {
464            conn: &self.0,
465            data: Some(data),
466            notify: self.0.shared.datagrams_unblocked.notified(),
467        }
468    }
469
470    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
471    ///
472    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
473    ///
474    /// This may change over the lifetime of a connection according to variation in the path MTU
475    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
476    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
477    ///
478    /// Not necessarily the maximum size of received datagrams.
479    ///
480    /// [`send_datagram()`]: Connection::send_datagram
481    pub fn max_datagram_size(&self) -> Option<usize> {
482        self.0
483            .state
484            .lock("max_datagram_size")
485            .inner
486            .datagrams()
487            .max_size()
488    }
489
490    /// Bytes available in the outgoing datagram buffer
491    ///
492    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
493    /// at most this size is guaranteed not to cause older datagrams to be dropped.
494    pub fn datagram_send_buffer_space(&self) -> usize {
495        self.0
496            .state
497            .lock("datagram_send_buffer_space")
498            .inner
499            .datagrams()
500            .send_buffer_space()
501    }
502
503    /// The peer's UDP address
504    ///
505    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
506    /// switching to a cellular internet connection.
507    pub fn remote_address(&self) -> SocketAddr {
508        self.0.state.lock("remote_address").inner.remote_address()
509    }
510
511    /// The local IP address which was used when the peer established
512    /// the connection
513    ///
514    /// This can be different from the address the endpoint is bound to, in case
515    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
516    ///
517    /// This will return `None` for clients, or when the platform does not expose this
518    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
519    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
520    pub fn local_ip(&self) -> Option<IpAddr> {
521        self.0.state.lock("local_ip").inner.local_ip()
522    }
523
524    /// Current best estimate of this connection's latency (round-trip-time)
525    pub fn rtt(&self) -> Duration {
526        self.0.state.lock("rtt").inner.rtt()
527    }
528
529    /// Returns connection statistics
530    pub fn stats(&self) -> ConnectionStats {
531        self.0.state.lock("stats").inner.stats()
532    }
533
534    /// Current state of the congestion control algorithm, for debugging purposes
535    pub fn congestion_state(&self) -> Box<dyn Controller> {
536        self.0
537            .state
538            .lock("congestion_state")
539            .inner
540            .congestion_state()
541            .clone_box()
542    }
543
544    /// Parameters negotiated during the handshake
545    ///
546    /// Guaranteed to return `Some` on fully established connections or after
547    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
548    /// the returned value.
549    ///
550    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
551    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
552        self.0
553            .state
554            .lock("handshake_data")
555            .inner
556            .crypto_session()
557            .handshake_data()
558    }
559
560    /// Cryptographic identity of the peer
561    ///
562    /// The dynamic type returned is determined by the configured
563    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
564    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
565    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
566        self.0
567            .state
568            .lock("peer_identity")
569            .inner
570            .crypto_session()
571            .peer_identity()
572    }
573
574    /// A stable identifier for this connection
575    ///
576    /// Peer addresses and connection IDs can change, but this value will remain
577    /// fixed for the lifetime of the connection.
578    pub fn stable_id(&self) -> usize {
579        self.0.stable_id()
580    }
581
582    /// Update traffic keys spontaneously
583    ///
584    /// This primarily exists for testing purposes.
585    pub fn force_key_update(&self) {
586        self.0
587            .state
588            .lock("force_key_update")
589            .inner
590            .force_key_update()
591    }
592
593    /// Derive keying material from this connection's TLS session secrets.
594    ///
595    /// When both peers call this method with the same `label` and `context`
596    /// arguments and `output` buffers of equal length, they will get the
597    /// same sequence of bytes in `output`. These bytes are cryptographically
598    /// strong and pseudorandom, and are suitable for use as keying material.
599    ///
600    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
601    pub fn export_keying_material(
602        &self,
603        output: &mut [u8],
604        label: &[u8],
605        context: &[u8],
606    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
607        self.0
608            .state
609            .lock("export_keying_material")
610            .inner
611            .crypto_session()
612            .export_keying_material(output, label, context)
613    }
614
615    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
616    ///
617    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
618    /// `count`s increase both minimum and worst-case memory consumption.
619    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
620        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
621        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
622        // May need to send MAX_STREAMS to make progress
623        conn.wake();
624    }
625
626    /// See [`proto::TransportConfig::receive_window()`]
627    pub fn set_receive_window(&self, receive_window: VarInt) {
628        let mut conn = self.0.state.lock("set_receive_window");
629        conn.inner.set_receive_window(receive_window);
630        conn.wake();
631    }
632
633    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
634    ///
635    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
636    /// `count`s increase both minimum and worst-case memory consumption.
637    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
638        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
639        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
640        // May need to send MAX_STREAMS to make progress
641        conn.wake();
642    }
643
644    /// Track changed on our external address as reported by the peer.
645    pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
646        let conn = self.0.state.lock("external_addr");
647        conn.observed_external_addr.subscribe()
648    }
649}
650
651pin_project! {
652    /// Future produced by [`Connection::open_uni`]
653    pub struct OpenUni<'a> {
654        conn: &'a ConnectionRef,
655        #[pin]
656        notify: Notified<'a>,
657    }
658}
659
660impl Future for OpenUni<'_> {
661    type Output = Result<SendStream, ConnectionError>;
662    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
663        let this = self.project();
664        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
665        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
666    }
667}
668
669pin_project! {
670    /// Future produced by [`Connection::open_bi`]
671    pub struct OpenBi<'a> {
672        conn: &'a ConnectionRef,
673        #[pin]
674        notify: Notified<'a>,
675    }
676}
677
678impl Future for OpenBi<'_> {
679    type Output = Result<(SendStream, RecvStream), ConnectionError>;
680    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
681        let this = self.project();
682        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
683
684        Poll::Ready(Ok((
685            SendStream::new(conn.clone(), id, is_0rtt),
686            RecvStream::new(conn, id, is_0rtt),
687        )))
688    }
689}
690
691fn poll_open<'a>(
692    ctx: &mut Context<'_>,
693    conn: &'a ConnectionRef,
694    mut notify: Pin<&mut Notified<'a>>,
695    dir: Dir,
696) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
697    let mut state = conn.state.lock("poll_open");
698    if let Some(ref e) = state.error {
699        return Poll::Ready(Err(e.clone()));
700    } else if let Some(id) = state.inner.streams().open(dir) {
701        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
702        drop(state); // Release the lock so clone can take it
703        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
704    }
705    loop {
706        match notify.as_mut().poll(ctx) {
707            // `state` lock ensures we didn't race with readiness
708            Poll::Pending => return Poll::Pending,
709            // Spurious wakeup, get a new future
710            Poll::Ready(()) => {
711                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
712            }
713        }
714    }
715}
716
717pin_project! {
718    /// Future produced by [`Connection::accept_uni`]
719    pub struct AcceptUni<'a> {
720        conn: &'a ConnectionRef,
721        #[pin]
722        notify: Notified<'a>,
723    }
724}
725
726impl Future for AcceptUni<'_> {
727    type Output = Result<RecvStream, ConnectionError>;
728
729    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
730        let this = self.project();
731        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
732        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
733    }
734}
735
736pin_project! {
737    /// Future produced by [`Connection::accept_bi`]
738    pub struct AcceptBi<'a> {
739        conn: &'a ConnectionRef,
740        #[pin]
741        notify: Notified<'a>,
742    }
743}
744
745impl Future for AcceptBi<'_> {
746    type Output = Result<(SendStream, RecvStream), ConnectionError>;
747
748    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
749        let this = self.project();
750        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
751        Poll::Ready(Ok((
752            SendStream::new(conn.clone(), id, is_0rtt),
753            RecvStream::new(conn, id, is_0rtt),
754        )))
755    }
756}
757
758fn poll_accept<'a>(
759    ctx: &mut Context<'_>,
760    conn: &'a ConnectionRef,
761    mut notify: Pin<&mut Notified<'a>>,
762    dir: Dir,
763) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
764    let mut state = conn.state.lock("poll_accept");
765    // Check for incoming streams before checking `state.error` so that already-received streams,
766    // which are necessarily finite, can be drained from a closed connection.
767    if let Some(id) = state.inner.streams().accept(dir) {
768        let is_0rtt = state.inner.is_handshaking();
769        state.wake(); // To send additional stream ID credit
770        drop(state); // Release the lock so clone can take it
771        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
772    } else if let Some(ref e) = state.error {
773        return Poll::Ready(Err(e.clone()));
774    }
775    loop {
776        match notify.as_mut().poll(ctx) {
777            // `state` lock ensures we didn't race with readiness
778            Poll::Pending => return Poll::Pending,
779            // Spurious wakeup, get a new future
780            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
781        }
782    }
783}
784
785pin_project! {
786    /// Future produced by [`Connection::read_datagram`]
787    pub struct ReadDatagram<'a> {
788        conn: &'a ConnectionRef,
789        #[pin]
790        notify: Notified<'a>,
791    }
792}
793
794impl Future for ReadDatagram<'_> {
795    type Output = Result<Bytes, ConnectionError>;
796    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
797        let mut this = self.project();
798        let mut state = this.conn.state.lock("ReadDatagram::poll");
799        // Check for buffered datagrams before checking `state.error` so that already-received
800        // datagrams, which are necessarily finite, can be drained from a closed connection.
801        if let Some(x) = state.inner.datagrams().recv() {
802            return Poll::Ready(Ok(x));
803        } else if let Some(ref e) = state.error {
804            return Poll::Ready(Err(e.clone()));
805        }
806        loop {
807            match this.notify.as_mut().poll(ctx) {
808                // `state` lock ensures we didn't race with readiness
809                Poll::Pending => return Poll::Pending,
810                // Spurious wakeup, get a new future
811                Poll::Ready(()) => this
812                    .notify
813                    .set(this.conn.shared.datagram_received.notified()),
814            }
815        }
816    }
817}
818
819pin_project! {
820    /// Future produced by [`Connection::send_datagram_wait`]
821    pub struct SendDatagram<'a> {
822        conn: &'a ConnectionRef,
823        data: Option<Bytes>,
824        #[pin]
825        notify: Notified<'a>,
826    }
827}
828
829impl Future for SendDatagram<'_> {
830    type Output = Result<(), SendDatagramError>;
831    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
832        let mut this = self.project();
833        let mut state = this.conn.state.lock("SendDatagram::poll");
834        if let Some(ref e) = state.error {
835            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
836        }
837        use proto::SendDatagramError::*;
838        match state
839            .inner
840            .datagrams()
841            .send(this.data.take().unwrap(), false)
842        {
843            Ok(()) => {
844                state.wake();
845                Poll::Ready(Ok(()))
846            }
847            Err(e) => Poll::Ready(Err(match e {
848                Blocked(data) => {
849                    this.data.replace(data);
850                    loop {
851                        match this.notify.as_mut().poll(ctx) {
852                            Poll::Pending => return Poll::Pending,
853                            // Spurious wakeup, get a new future
854                            Poll::Ready(()) => this
855                                .notify
856                                .set(this.conn.shared.datagrams_unblocked.notified()),
857                        }
858                    }
859                }
860                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
861                Disabled => SendDatagramError::Disabled,
862                TooLarge => SendDatagramError::TooLarge,
863            })),
864        }
865    }
866}
867
868#[derive(Debug)]
869pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
870
871impl ConnectionRef {
872    #[allow(clippy::too_many_arguments)]
873    fn new(
874        handle: ConnectionHandle,
875        conn: proto::Connection,
876        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
877        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
878        on_handshake_data: oneshot::Sender<()>,
879        on_connected: oneshot::Sender<bool>,
880        sender: Pin<Box<dyn UdpSender>>,
881        runtime: Arc<dyn Runtime>,
882    ) -> Self {
883        Self(Arc::new(ConnectionInner {
884            state: Mutex::new(State {
885                inner: conn,
886                driver: None,
887                handle,
888                on_handshake_data: Some(on_handshake_data),
889                on_connected: Some(on_connected),
890                connected: false,
891                timer: None,
892                timer_deadline: None,
893                conn_events,
894                endpoint_events,
895                blocked_writers: FxHashMap::default(),
896                blocked_readers: FxHashMap::default(),
897                stopped: FxHashMap::default(),
898                error: None,
899                ref_count: 0,
900                sender,
901                runtime,
902                send_buffer: Vec::new(),
903                buffered_transmit: None,
904                observed_external_addr: watch::Sender::new(None),
905            }),
906            shared: Shared::default(),
907        }))
908    }
909
910    fn stable_id(&self) -> usize {
911        &*self.0 as *const _ as usize
912    }
913}
914
915impl Clone for ConnectionRef {
916    fn clone(&self) -> Self {
917        self.state.lock("clone").ref_count += 1;
918        Self(self.0.clone())
919    }
920}
921
922impl Drop for ConnectionRef {
923    fn drop(&mut self) {
924        let conn = &mut *self.state.lock("drop");
925        if let Some(x) = conn.ref_count.checked_sub(1) {
926            conn.ref_count = x;
927            if x == 0 && !conn.inner.is_closed() {
928                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
929                // not, we can't do any harm. If there were any streams being opened, then either
930                // the connection will be closed for an unrelated reason or a fresh reference will
931                // be constructed for the newly opened stream.
932                conn.implicit_close(&self.shared);
933            }
934        }
935    }
936}
937
938impl std::ops::Deref for ConnectionRef {
939    type Target = ConnectionInner;
940    fn deref(&self) -> &Self::Target {
941        &self.0
942    }
943}
944
945#[derive(Debug)]
946pub(crate) struct ConnectionInner {
947    pub(crate) state: Mutex<State>,
948    pub(crate) shared: Shared,
949}
950
951/// A handle to some connection internals, use with care.
952///
953/// This contains a weak reference to the connection so will not itself keep the connection
954/// alive.
955#[derive(Debug)]
956pub struct WeakConnectionHandle(Weak<ConnectionInner>);
957
958impl WeakConnectionHandle {
959    /// Returns `true` if the [`Connection`] associated with this handle is still alive.
960    pub fn is_alive(&self) -> bool {
961        self.0.upgrade().is_some()
962    }
963
964    /// Resets path-specific state.
965    ///
966    /// This resets several subsystems keeping state for a specific network path.  It is
967    /// useful if it is known that the underlying network path changed substantially.
968    ///
969    /// Currently resets:
970    /// - RTT Estimator
971    /// - Congestion Controller
972    /// - MTU Discovery
973    ///
974    /// # Returns
975    ///
976    /// `true` if the connection still existed and the congestion controller state was
977    /// reset.  `false` otherwise.
978    pub fn network_path_changed(&self) -> bool {
979        if let Some(inner) = self.0.upgrade() {
980            let mut inner_state = inner.state.lock("reset-congestion-state");
981            inner_state.inner.path_changed(Instant::now());
982            true
983        } else {
984            false
985        }
986    }
987}
988
989#[derive(Debug, Default)]
990pub(crate) struct Shared {
991    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
992    /// control budget
993    stream_budget_available: [Notify; 2],
994    /// Notified when the peer has initiated a new stream
995    stream_incoming: [Notify; 2],
996    datagram_received: Notify,
997    datagrams_unblocked: Notify,
998    closed: Notify,
999}
1000
1001pub(crate) struct State {
1002    pub(crate) inner: proto::Connection,
1003    driver: Option<Waker>,
1004    handle: ConnectionHandle,
1005    on_handshake_data: Option<oneshot::Sender<()>>,
1006    on_connected: Option<oneshot::Sender<bool>>,
1007    connected: bool,
1008    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1009    timer_deadline: Option<Instant>,
1010    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1011    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1012    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1013    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1014    pub(crate) stopped: FxHashMap<StreamId, Waker>,
1015    /// Always set to Some before the connection becomes drained
1016    pub(crate) error: Option<ConnectionError>,
1017    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1018    ref_count: usize,
1019    sender: Pin<Box<dyn UdpSender>>,
1020    runtime: Arc<dyn Runtime>,
1021    send_buffer: Vec<u8>,
1022    /// We buffer a transmit when the underlying I/O would block
1023    buffered_transmit: Option<proto::Transmit>,
1024    /// Our last external address reported by the peer.
1025    pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1026}
1027
1028impl State {
1029    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1030        let now = self.runtime.now();
1031        let mut transmits = 0;
1032
1033        let max_datagrams = self.sender.max_transmit_segments();
1034
1035        loop {
1036            // Retry the last transmit, or get a new one.
1037            let t = match self.buffered_transmit.take() {
1038                Some(t) => t,
1039                None => {
1040                    self.send_buffer.clear();
1041                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1042                    match self
1043                        .inner
1044                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1045                    {
1046                        Some(t) => {
1047                            transmits += match t.segment_size {
1048                                None => 1,
1049                                Some(s) => (t.size + s - 1) / s, // round up
1050                            };
1051                            t
1052                        }
1053                        None => break,
1054                    }
1055                }
1056            };
1057
1058            let len = t.size;
1059            match self
1060                .sender
1061                .as_mut()
1062                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1063            {
1064                Poll::Pending => {
1065                    self.buffered_transmit = Some(t);
1066                    return Ok(false);
1067                }
1068                Poll::Ready(Err(e)) => return Err(e),
1069                Poll::Ready(Ok(())) => {}
1070            }
1071
1072            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1073                // TODO: What isn't ideal here yet is that if we don't poll all
1074                // datagrams that could be sent we don't go into the `app_limited`
1075                // state and CWND continues to grow until we get here the next time.
1076                // See https://github.com/quinn-rs/quinn/issues/1126
1077                return Ok(true);
1078            }
1079        }
1080
1081        Ok(false)
1082    }
1083
1084    fn forward_endpoint_events(&mut self) {
1085        while let Some(event) = self.inner.poll_endpoint_events() {
1086            // If the endpoint driver is gone, noop.
1087            let _ = self.endpoint_events.send((self.handle, event));
1088        }
1089    }
1090
1091    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1092    fn process_conn_events(
1093        &mut self,
1094        shared: &Shared,
1095        cx: &mut Context,
1096    ) -> Result<(), ConnectionError> {
1097        loop {
1098            match self.conn_events.poll_recv(cx) {
1099                Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1100                    self.sender = sender;
1101                    self.inner.local_address_changed();
1102                }
1103                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1104                    self.inner.handle_event(event);
1105                }
1106                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1107                    self.close(error_code, reason, shared);
1108                }
1109                Poll::Ready(None) => {
1110                    return Err(ConnectionError::TransportError(proto::TransportError {
1111                        code: proto::TransportErrorCode::INTERNAL_ERROR,
1112                        frame: None,
1113                        reason: "endpoint driver future was dropped".to_string(),
1114                    }));
1115                }
1116                Poll::Pending => {
1117                    return Ok(());
1118                }
1119            }
1120        }
1121    }
1122
1123    fn forward_app_events(&mut self, shared: &Shared) {
1124        while let Some(event) = self.inner.poll() {
1125            use proto::Event::*;
1126            match event {
1127                HandshakeDataReady => {
1128                    if let Some(x) = self.on_handshake_data.take() {
1129                        let _ = x.send(());
1130                    }
1131                }
1132                Connected => {
1133                    self.connected = true;
1134                    if let Some(x) = self.on_connected.take() {
1135                        // We don't care if the on-connected future was dropped
1136                        let _ = x.send(self.inner.accepted_0rtt());
1137                    }
1138                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1139                        // Wake up rejected 0-RTT streams so they can fail immediately with
1140                        // `ZeroRttRejected` errors.
1141                        wake_all(&mut self.blocked_writers);
1142                        wake_all(&mut self.blocked_readers);
1143                        wake_all(&mut self.stopped);
1144                    }
1145                }
1146                ConnectionLost { reason } => {
1147                    self.terminate(reason, shared);
1148                }
1149                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1150                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1151                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1152                }
1153                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1154                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1155                }
1156                DatagramReceived => {
1157                    shared.datagram_received.notify_waiters();
1158                }
1159                DatagramsUnblocked => {
1160                    shared.datagrams_unblocked.notify_waiters();
1161                }
1162                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1163                Stream(StreamEvent::Available { dir }) => {
1164                    // Might mean any number of streams are ready, so we wake up everyone
1165                    shared.stream_budget_available[dir as usize].notify_waiters();
1166                }
1167                Stream(StreamEvent::Finished { id }) => wake_stream(id, &mut self.stopped),
1168                Stream(StreamEvent::Stopped { id, .. }) => {
1169                    wake_stream(id, &mut self.stopped);
1170                    wake_stream(id, &mut self.blocked_writers);
1171                }
1172                ObservedAddr(observed) => {
1173                    self.observed_external_addr.send_if_modified(|addr| {
1174                        let old = addr.replace(observed);
1175                        old != *addr
1176                    });
1177                }
1178            }
1179        }
1180    }
1181
1182    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1183        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1184        // timer is registered with the runtime (and check whether it's already
1185        // expired).
1186        match self.inner.poll_timeout() {
1187            Some(deadline) => {
1188                if let Some(delay) = &mut self.timer {
1189                    // There is no need to reset the tokio timer if the deadline
1190                    // did not change
1191                    if self
1192                        .timer_deadline
1193                        .map(|current_deadline| current_deadline != deadline)
1194                        .unwrap_or(true)
1195                    {
1196                        delay.as_mut().reset(deadline);
1197                    }
1198                } else {
1199                    self.timer = Some(self.runtime.new_timer(deadline));
1200                }
1201                // Store the actual expiration time of the timer
1202                self.timer_deadline = Some(deadline);
1203            }
1204            None => {
1205                self.timer_deadline = None;
1206                return false;
1207            }
1208        }
1209
1210        if self.timer_deadline.is_none() {
1211            return false;
1212        }
1213
1214        let delay = self
1215            .timer
1216            .as_mut()
1217            .expect("timer must exist in this state")
1218            .as_mut();
1219        if delay.poll(cx).is_pending() {
1220            // Since there wasn't a timeout event, there is nothing new
1221            // for the connection to do
1222            return false;
1223        }
1224
1225        // A timer expired, so the caller needs to check for
1226        // new transmits, which might cause new timers to be set.
1227        self.inner.handle_timeout(self.runtime.now());
1228        self.timer_deadline = None;
1229        true
1230    }
1231
1232    /// Wake up a blocked `Driver` task to process I/O
1233    pub(crate) fn wake(&mut self) {
1234        if let Some(x) = self.driver.take() {
1235            x.wake();
1236        }
1237    }
1238
1239    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1240    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1241        self.error = Some(reason.clone());
1242        if let Some(x) = self.on_handshake_data.take() {
1243            let _ = x.send(());
1244        }
1245        wake_all(&mut self.blocked_writers);
1246        wake_all(&mut self.blocked_readers);
1247        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1248        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1249        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1250        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1251        shared.datagram_received.notify_waiters();
1252        shared.datagrams_unblocked.notify_waiters();
1253        if let Some(x) = self.on_connected.take() {
1254            let _ = x.send(false);
1255        }
1256        wake_all(&mut self.stopped);
1257        shared.closed.notify_waiters();
1258    }
1259
1260    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1261        self.inner.close(self.runtime.now(), error_code, reason);
1262        self.terminate(ConnectionError::LocallyClosed, shared);
1263        self.wake();
1264    }
1265
1266    /// Close for a reason other than the application's explicit request
1267    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1268        self.close(0u32.into(), Bytes::new(), shared);
1269    }
1270
1271    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1272        if self.inner.is_handshaking()
1273            || self.inner.accepted_0rtt()
1274            || self.inner.side().is_server()
1275        {
1276            Ok(())
1277        } else {
1278            Err(())
1279        }
1280    }
1281}
1282
1283impl Drop for State {
1284    fn drop(&mut self) {
1285        if !self.inner.is_drained() {
1286            // Ensure the endpoint can tidy up
1287            let _ = self
1288                .endpoint_events
1289                .send((self.handle, proto::EndpointEvent::drained()));
1290        }
1291    }
1292}
1293
1294impl fmt::Debug for State {
1295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1296        f.debug_struct("State").field("inner", &self.inner).finish()
1297    }
1298}
1299
1300fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1301    if let Some(waker) = wakers.remove(&stream_id) {
1302        waker.wake();
1303    }
1304}
1305
1306fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1307    wakers.drain().for_each(|(_, waker)| waker.wake())
1308}
1309
1310/// Errors that can arise when sending a datagram
1311#[derive(Debug, Error, Clone, Eq, PartialEq)]
1312pub enum SendDatagramError {
1313    /// The peer does not support receiving datagram frames
1314    #[error("datagrams not supported by peer")]
1315    UnsupportedByPeer,
1316    /// Datagram support is disabled locally
1317    #[error("datagram support disabled")]
1318    Disabled,
1319    /// The datagram is larger than the connection can currently accommodate
1320    ///
1321    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1322    /// exceeded.
1323    #[error("datagram too large")]
1324    TooLarge,
1325    /// The connection was lost
1326    #[error("connection lost")]
1327    ConnectionLost(#[from] ConnectionError),
1328}
1329
1330/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1331///
1332/// This limits the amount of CPU resources consumed by datagram generation,
1333/// and allows other tasks (like receiving ACKs) to run in between.
1334const MAX_TRANSMIT_DATAGRAMS: usize = 20;