ant_quic/quinn_high_level/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use super::{
20    ConnectionEvent, 
21    mutex::Mutex,
22    recv_stream::RecvStream,
23    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24    send_stream::SendStream,
25    udp_transmit,
26};
27use crate::{
28    ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent, Instant, Side, StreamEvent,
29    StreamId, VarInt, congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35    conn: Option<ConnectionRef>,
36    connected: oneshot::Receiver<bool>,
37    handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41    pub(crate) fn new(
42        handle: ConnectionHandle,
43        conn: crate::Connection,
44        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46        socket: Arc<dyn AsyncUdpSocket>,
47        runtime: Arc<dyn Runtime>,
48    ) -> Self {
49        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50        let (on_connected_send, on_connected_recv) = oneshot::channel();
51        let conn = ConnectionRef::new(
52            handle,
53            conn,
54            endpoint_events,
55            conn_events,
56            on_handshake_data_send,
57            on_connected_send,
58            socket,
59            runtime.clone(),
60        );
61
62        let driver = ConnectionDriver(conn.clone());
63        runtime.spawn(Box::pin(
64            async {
65                if let Err(e) = driver.await {
66                    tracing::error!("I/O error: {e}");
67                }
68            }
69            .instrument(Span::current()),
70        ));
71
72        Self {
73            conn: Some(conn),
74            connected: on_connected_recv,
75            handshake_data_ready: Some(on_handshake_data_recv),
76        }
77    }
78
79    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80    ///
81    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82    /// If so, the returned [`Connection`] can be used to send application data without waiting for
83    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85    /// complete, at which point subsequently opened streams and written data will have full
86    /// cryptographic protection.
87    ///
88    /// ## Outgoing
89    ///
90    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96    /// If it was rejected, the existence of streams opened and other application data sent prior
97    /// to the handshake completing will not be conveyed to the remote application, and local
98    /// operations on them will return `ZeroRttRejected` errors.
99    ///
100    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101    /// relevant resumption state to be stored in the server, which servers may limit or lose for
102    /// various reasons including not persisting resumption state across server restarts.
103    ///
104    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105    /// implementation's docs for 0-RTT pitfalls.
106    ///
107    /// ## Incoming
108    ///
109    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111    ///
112    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Security
116    ///
117    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118    /// replay attacks, and should therefore never invoke non-idempotent operations.
119    ///
120    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121    /// before TLS client authentication has occurred, and should therefore not be used to send
122    /// data for which client authentication is being used.
123    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125        // have to release it explicitly before returning `self` by value.
126        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129        drop(conn);
130
131        if is_ok {
132            let conn = self.conn.take().unwrap();
133            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
134        } else {
135            Err(self)
136        }
137    }
138
139    /// Parameters negotiated during the handshake
140    ///
141    /// The dynamic type returned is determined by the configured
142    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
143    /// be [`downcast`](Box::downcast) to a
144    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
145    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
146        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
147        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
148        // simple.
149        if let Some(x) = self.handshake_data_ready.take() {
150            let _ = x.await;
151        }
152        let conn = self.conn.as_ref().unwrap();
153        let inner = conn.state.lock("handshake");
154        inner
155            .inner
156            .crypto_session()
157            .handshake_data()
158            .ok_or_else(|| {
159                inner
160                    .error
161                    .clone()
162                    .expect("spurious handshake data ready notification")
163            })
164    }
165
166    /// The local IP address which was used when the peer established
167    /// the connection
168    ///
169    /// This can be different from the address the endpoint is bound to, in case
170    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
171    ///
172    /// This will return `None` for clients, or when the platform does not expose this
173    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
174    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
175    ///
176    /// Will panic if called after `poll` has returned `Ready`.
177    pub fn local_ip(&self) -> Option<IpAddr> {
178        let conn = self.conn.as_ref().unwrap();
179        let inner = conn.state.lock("local_ip");
180
181        inner.inner.local_ip()
182    }
183
184    /// The peer's UDP address
185    ///
186    /// Will panic if called after `poll` has returned `Ready`.
187    pub fn remote_address(&self) -> SocketAddr {
188        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
189        conn_ref.state.lock("remote_address").inner.remote_address()
190    }
191}
192
193impl Future for Connecting {
194    type Output = Result<Connection, ConnectionError>;
195    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
196        Pin::new(&mut self.connected).poll(cx).map(|_| {
197            let conn = self.conn.take().unwrap();
198            let inner = conn.state.lock("connecting");
199            if inner.connected {
200                drop(inner);
201                Ok(Connection(conn))
202            } else {
203                Err(inner
204                    .error
205                    .clone()
206                    .expect("connected signaled without connection success or error"))
207            }
208        })
209    }
210}
211
212/// Future that completes when a connection is fully established
213///
214/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
215/// value is meaningless.
216pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
217
218impl Future for ZeroRttAccepted {
219    type Output = bool;
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
222    }
223}
224
225/// A future that drives protocol logic for a connection
226///
227/// This future handles the protocol logic for a single connection, routing events from the
228/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
229/// It also keeps track of outstanding timeouts for the `Connection`.
230///
231/// If the connection encounters an error condition, this future will yield an error. It will
232/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
233/// connection-related futures, this waits for the draining period to complete to ensure that
234/// packets still in flight from the peer are handled gracefully.
235#[must_use = "connection drivers must be spawned for their connections to function"]
236#[derive(Debug)]
237struct ConnectionDriver(ConnectionRef);
238
239impl Future for ConnectionDriver {
240    type Output = Result<(), io::Error>;
241
242    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
243        let conn = &mut *self.0.state.lock("poll");
244
245        let span = debug_span!("drive", id = conn.handle.0);
246        let _guard = span.enter();
247
248        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
249            conn.terminate(e, &self.0.shared);
250            return Poll::Ready(Ok(()));
251        }
252        let mut keep_going = conn.drive_transmit(cx)?;
253        // If a timer expires, there might be more to transmit. When we transmit something, we
254        // might need to reset a timer. Hence, we must loop until neither happens.
255        keep_going |= conn.drive_timer(cx);
256        conn.forward_endpoint_events();
257        conn.forward_app_events(&self.0.shared);
258
259        if !conn.inner.is_drained() {
260            if keep_going {
261                // If the connection hasn't processed all tasks, schedule it again
262                cx.waker().wake_by_ref();
263            } else {
264                conn.driver = Some(cx.waker().clone());
265            }
266            return Poll::Pending;
267        }
268        if conn.error.is_none() {
269            unreachable!("drained connections always have an error");
270        }
271        Poll::Ready(Ok(()))
272    }
273}
274
275/// A QUIC connection.
276///
277/// If all references to a connection (including every clone of the `Connection` handle, streams of
278/// incoming streams, and the various stream types) have been dropped, then the connection will be
279/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
280/// connection explicitly by calling [`Connection::close()`].
281///
282/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
283/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
284/// application. [`Connection::close()`] describes in more detail how to gracefully close a
285/// connection without losing application data.
286///
287/// May be cloned to obtain another handle to the same connection.
288///
289/// [`Connection::close()`]: Connection::close
290#[derive(Debug, Clone)]
291pub struct Connection(ConnectionRef);
292
293impl Connection {
294    /// Initiate a new outgoing unidirectional stream.
295    ///
296    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
297    /// consequence, the peer won't be notified that a stream has been opened until the stream is
298    /// actually used.
299    pub fn open_uni(&self) -> OpenUni<'_> {
300        OpenUni {
301            conn: &self.0,
302            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
303        }
304    }
305
306    /// Initiate a new outgoing bidirectional stream.
307    ///
308    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
309    /// consequence, the peer won't be notified that a stream has been opened until the stream is
310    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
311    /// anything to [`SendStream`] will never succeed.
312    ///
313    /// [`open_bi()`]: crate::Connection::open_bi
314    /// [`SendStream`]: crate::SendStream
315    /// [`RecvStream`]: crate::RecvStream
316    pub fn open_bi(&self) -> OpenBi<'_> {
317        OpenBi {
318            conn: &self.0,
319            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
320        }
321    }
322
323    /// Accept the next incoming uni-directional stream
324    pub fn accept_uni(&self) -> AcceptUni<'_> {
325        AcceptUni {
326            conn: &self.0,
327            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
328        }
329    }
330
331    /// Accept the next incoming bidirectional stream
332    ///
333    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
334    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
335    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
336    ///
337    /// [`accept_bi()`]: crate::Connection::accept_bi
338    /// [`open_bi()`]: crate::Connection::open_bi
339    /// [`SendStream`]: crate::SendStream
340    /// [`RecvStream`]: crate::RecvStream
341    pub fn accept_bi(&self) -> AcceptBi<'_> {
342        AcceptBi {
343            conn: &self.0,
344            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
345        }
346    }
347
348    /// Receive an application datagram
349    pub fn read_datagram(&self) -> ReadDatagram<'_> {
350        ReadDatagram {
351            conn: &self.0,
352            notify: self.0.shared.datagram_received.notified(),
353        }
354    }
355
356    /// Wait for the connection to be closed for any reason
357    ///
358    /// Despite the return type's name, closed connections are often not an error condition at the
359    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
360    /// and [`ConnectionError::ApplicationClosed`].
361    pub async fn closed(&self) -> ConnectionError {
362        {
363            let conn = self.0.state.lock("closed");
364            if let Some(error) = conn.error.as_ref() {
365                return error.clone();
366            }
367            // Construct the future while the lock is held to ensure we can't miss a wakeup if
368            // the `Notify` is signaled immediately after we release the lock. `await` it after
369            // the lock guard is out of scope.
370            self.0.shared.closed.notified()
371        }
372        .await;
373        self.0
374            .state
375            .lock("closed")
376            .error
377            .as_ref()
378            .expect("closed without an error")
379            .clone()
380    }
381
382    /// If the connection is closed, the reason why.
383    ///
384    /// Returns `None` if the connection is still open.
385    pub fn close_reason(&self) -> Option<ConnectionError> {
386        self.0.state.lock("close_reason").error.clone()
387    }
388
389    /// Close the connection immediately.
390    ///
391    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
392    /// more data is sent to the peer and the peer may drop buffered data upon receiving
393    /// the CONNECTION_CLOSE frame.
394    ///
395    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
396    ///
397    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
398    /// is preserved in full, it should be kept under 1KiB.
399    ///
400    /// # Gracefully closing a connection
401    ///
402    /// Only the peer last receiving application data can be certain that all data is
403    /// delivered. The only reliable action it can then take is to close the connection,
404    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
405    /// frame is very likely if both endpoints stay online long enough, and
406    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
407    /// remote peer will time out the connection, provided that the idle timeout is not
408    /// disabled.
409    ///
410    /// The sending side can not guarantee all stream data is delivered to the remote
411    /// application. It only knows the data is delivered to the QUIC stack of the remote
412    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
413    /// [`close()`] the remote endpoint may drop any data it received but is as yet
414    /// undelivered to the application, including data that was acknowledged as received to
415    /// the local endpoint.
416    ///
417    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
418    /// [`Endpoint::wait_idle()`]: crate::quinn_high_level::Endpoint::wait_idle
419    /// [`close()`]: Connection::close
420    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
421        let conn = &mut *self.0.state.lock("close");
422        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
423    }
424
425    /// Transmit `data` as an unreliable, unordered application datagram
426    ///
427    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
428    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
429    /// dictated by the peer.
430    ///
431    /// Previously queued datagrams which are still unsent may be discarded to make space for this
432    /// datagram, in order of oldest to newest.
433    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
434        let conn = &mut *self.0.state.lock("send_datagram");
435        if let Some(ref x) = conn.error {
436            return Err(SendDatagramError::ConnectionLost(x.clone()));
437        }
438        use crate::SendDatagramError::*;
439        match conn.inner.datagrams().send(data, true) {
440            Ok(()) => {
441                conn.wake();
442                Ok(())
443            }
444            Err(e) => Err(match e {
445                Blocked(..) => unreachable!(),
446                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
447                Disabled => SendDatagramError::Disabled,
448                TooLarge => SendDatagramError::TooLarge,
449            }),
450        }
451    }
452
453    /// Transmit `data` as an unreliable, unordered application datagram
454    ///
455    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
456    /// conditions, which effectively prioritizes old datagrams over new datagrams.
457    ///
458    /// See [`send_datagram()`] for details.
459    ///
460    /// [`send_datagram()`]: Connection::send_datagram
461    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
462        SendDatagram {
463            conn: &self.0,
464            data: Some(data),
465            notify: self.0.shared.datagrams_unblocked.notified(),
466        }
467    }
468
469    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
470    ///
471    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
472    ///
473    /// This may change over the lifetime of a connection according to variation in the path MTU
474    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
475    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
476    ///
477    /// Not necessarily the maximum size of received datagrams.
478    ///
479    /// [`send_datagram()`]: Connection::send_datagram
480    pub fn max_datagram_size(&self) -> Option<usize> {
481        self.0
482            .state
483            .lock("max_datagram_size")
484            .inner
485            .datagrams()
486            .max_size()
487    }
488
489    /// Bytes available in the outgoing datagram buffer
490    ///
491    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
492    /// at most this size is guaranteed not to cause older datagrams to be dropped.
493    pub fn datagram_send_buffer_space(&self) -> usize {
494        self.0
495            .state
496            .lock("datagram_send_buffer_space")
497            .inner
498            .datagrams()
499            .send_buffer_space()
500    }
501
502    /// The side of the connection (client or server)
503    pub fn side(&self) -> Side {
504        self.0.state.lock("side").inner.side()
505    }
506
507    /// The peer's UDP address
508    ///
509    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
510    /// switching to a cellular internet connection.
511    pub fn remote_address(&self) -> SocketAddr {
512        self.0.state.lock("remote_address").inner.remote_address()
513    }
514
515    /// The local IP address which was used when the peer established
516    /// the connection
517    ///
518    /// This can be different from the address the endpoint is bound to, in case
519    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
520    ///
521    /// This will return `None` for clients, or when the platform does not expose this
522    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
523    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
524    pub fn local_ip(&self) -> Option<IpAddr> {
525        self.0.state.lock("local_ip").inner.local_ip()
526    }
527
528    /// Current best estimate of this connection's latency (round-trip-time)
529    pub fn rtt(&self) -> Duration {
530        self.0.state.lock("rtt").inner.rtt()
531    }
532
533    /// Returns connection statistics
534    pub fn stats(&self) -> ConnectionStats {
535        self.0.state.lock("stats").inner.stats()
536    }
537
538    /// Current state of the congestion control algorithm, for debugging purposes
539    pub fn congestion_state(&self) -> Box<dyn Controller> {
540        self.0
541            .state
542            .lock("congestion_state")
543            .inner
544            .congestion_state()
545            .clone_box()
546    }
547
548    /// Parameters negotiated during the handshake
549    ///
550    /// Guaranteed to return `Some` on fully established connections or after
551    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
552    /// the returned value.
553    ///
554    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
555    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
556        self.0
557            .state
558            .lock("handshake_data")
559            .inner
560            .crypto_session()
561            .handshake_data()
562    }
563
564    /// Cryptographic identity of the peer
565    ///
566    /// The dynamic type returned is determined by the configured
567    /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
568    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
569    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
570        self.0
571            .state
572            .lock("peer_identity")
573            .inner
574            .crypto_session()
575            .peer_identity()
576    }
577
578    /// A stable identifier for this connection
579    ///
580    /// Peer addresses and connection IDs can change, but this value will remain
581    /// fixed for the lifetime of the connection.
582    pub fn stable_id(&self) -> usize {
583        self.0.stable_id()
584    }
585
586    /// Update traffic keys spontaneously
587    ///
588    /// This primarily exists for testing purposes.
589    pub fn force_key_update(&self) {
590        self.0
591            .state
592            .lock("force_key_update")
593            .inner
594            .force_key_update()
595    }
596
597    /// Derive keying material from this connection's TLS session secrets.
598    ///
599    /// When both peers call this method with the same `label` and `context`
600    /// arguments and `output` buffers of equal length, they will get the
601    /// same sequence of bytes in `output`. These bytes are cryptographically
602    /// strong and pseudorandom, and are suitable for use as keying material.
603    ///
604    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
605    pub fn export_keying_material(
606        &self,
607        output: &mut [u8],
608        label: &[u8],
609        context: &[u8],
610    ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
611        self.0
612            .state
613            .lock("export_keying_material")
614            .inner
615            .crypto_session()
616            .export_keying_material(output, label, context)
617    }
618
619    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
620    ///
621    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
622    /// `count`s increase both minimum and worst-case memory consumption.
623    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
624        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
625        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
626        // May need to send MAX_STREAMS to make progress
627        conn.wake();
628    }
629
630    /// See [`crate::TransportConfig::receive_window()`]
631    pub fn set_receive_window(&self, receive_window: VarInt) {
632        let mut conn = self.0.state.lock("set_receive_window");
633        conn.inner.set_receive_window(receive_window);
634        conn.wake();
635    }
636
637    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
638    ///
639    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
640    /// `count`s increase both minimum and worst-case memory consumption.
641    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
642        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
643        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
644        // May need to send MAX_STREAMS to make progress
645        conn.wake();
646    }
647
648    /// Set up qlog for this connection.
649    #[cfg(feature = "__qlog")]
650    pub fn set_qlog(
651        &mut self,
652        writer: Box<dyn std::io::Write + Send + Sync>,
653        title: Option<String>,
654        description: Option<String>,
655    ) {
656        let mut state = self.0.state.lock("__qlog");
657        state
658            .inner
659            .set_qlog(writer, title, description, Instant::now());
660    }
661}
662
663pin_project! {
664    /// Future produced by [`Connection::open_uni`]
665    pub struct OpenUni<'a> {
666        conn: &'a ConnectionRef,
667        #[pin]
668        notify: Notified<'a>,
669    }
670}
671
672impl Future for OpenUni<'_> {
673    type Output = Result<SendStream, ConnectionError>;
674    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
675        let this = self.project();
676        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
677        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
678    }
679}
680
681pin_project! {
682    /// Future produced by [`Connection::open_bi`]
683    pub struct OpenBi<'a> {
684        conn: &'a ConnectionRef,
685        #[pin]
686        notify: Notified<'a>,
687    }
688}
689
690impl Future for OpenBi<'_> {
691    type Output = Result<(SendStream, RecvStream), ConnectionError>;
692    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
693        let this = self.project();
694        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
695
696        Poll::Ready(Ok((
697            SendStream::new(conn.clone(), id, is_0rtt),
698            RecvStream::new(conn, id, is_0rtt),
699        )))
700    }
701}
702
703fn poll_open<'a>(
704    ctx: &mut Context<'_>,
705    conn: &'a ConnectionRef,
706    mut notify: Pin<&mut Notified<'a>>,
707    dir: Dir,
708) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
709    let mut state = conn.state.lock("poll_open");
710    if let Some(ref e) = state.error {
711        return Poll::Ready(Err(e.clone()));
712    } else if let Some(id) = state.inner.streams().open(dir) {
713        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
714        drop(state); // Release the lock so clone can take it
715        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
716    }
717    loop {
718        match notify.as_mut().poll(ctx) {
719            // `state` lock ensures we didn't race with readiness
720            Poll::Pending => return Poll::Pending,
721            // Spurious wakeup, get a new future
722            Poll::Ready(()) => {
723                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
724            }
725        }
726    }
727}
728
729pin_project! {
730    /// Future produced by [`Connection::accept_uni`]
731    pub struct AcceptUni<'a> {
732        conn: &'a ConnectionRef,
733        #[pin]
734        notify: Notified<'a>,
735    }
736}
737
738impl Future for AcceptUni<'_> {
739    type Output = Result<RecvStream, ConnectionError>;
740
741    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
742        let this = self.project();
743        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
744        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
745    }
746}
747
748pin_project! {
749    /// Future produced by [`Connection::accept_bi`]
750    pub struct AcceptBi<'a> {
751        conn: &'a ConnectionRef,
752        #[pin]
753        notify: Notified<'a>,
754    }
755}
756
757impl Future for AcceptBi<'_> {
758    type Output = Result<(SendStream, RecvStream), ConnectionError>;
759
760    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
761        let this = self.project();
762        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
763        Poll::Ready(Ok((
764            SendStream::new(conn.clone(), id, is_0rtt),
765            RecvStream::new(conn, id, is_0rtt),
766        )))
767    }
768}
769
770fn poll_accept<'a>(
771    ctx: &mut Context<'_>,
772    conn: &'a ConnectionRef,
773    mut notify: Pin<&mut Notified<'a>>,
774    dir: Dir,
775) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
776    let mut state = conn.state.lock("poll_accept");
777    // Check for incoming streams before checking `state.error` so that already-received streams,
778    // which are necessarily finite, can be drained from a closed connection.
779    if let Some(id) = state.inner.streams().accept(dir) {
780        let is_0rtt = state.inner.is_handshaking();
781        state.wake(); // To send additional stream ID credit
782        drop(state); // Release the lock so clone can take it
783        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
784    } else if let Some(ref e) = state.error {
785        return Poll::Ready(Err(e.clone()));
786    }
787    loop {
788        match notify.as_mut().poll(ctx) {
789            // `state` lock ensures we didn't race with readiness
790            Poll::Pending => return Poll::Pending,
791            // Spurious wakeup, get a new future
792            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
793        }
794    }
795}
796
797pin_project! {
798    /// Future produced by [`Connection::read_datagram`]
799    pub struct ReadDatagram<'a> {
800        conn: &'a ConnectionRef,
801        #[pin]
802        notify: Notified<'a>,
803    }
804}
805
806impl Future for ReadDatagram<'_> {
807    type Output = Result<Bytes, ConnectionError>;
808    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
809        let mut this = self.project();
810        let mut state = this.conn.state.lock("ReadDatagram::poll");
811        // Check for buffered datagrams before checking `state.error` so that already-received
812        // datagrams, which are necessarily finite, can be drained from a closed connection.
813        if let Some(x) = state.inner.datagrams().recv() {
814            return Poll::Ready(Ok(x));
815        } else if let Some(ref e) = state.error {
816            return Poll::Ready(Err(e.clone()));
817        }
818        loop {
819            match this.notify.as_mut().poll(ctx) {
820                // `state` lock ensures we didn't race with readiness
821                Poll::Pending => return Poll::Pending,
822                // Spurious wakeup, get a new future
823                Poll::Ready(()) => this
824                    .notify
825                    .set(this.conn.shared.datagram_received.notified()),
826            }
827        }
828    }
829}
830
831pin_project! {
832    /// Future produced by [`Connection::send_datagram_wait`]
833    pub struct SendDatagram<'a> {
834        conn: &'a ConnectionRef,
835        data: Option<Bytes>,
836        #[pin]
837        notify: Notified<'a>,
838    }
839}
840
841impl Future for SendDatagram<'_> {
842    type Output = Result<(), SendDatagramError>;
843    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
844        let mut this = self.project();
845        let mut state = this.conn.state.lock("SendDatagram::poll");
846        if let Some(ref e) = state.error {
847            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
848        }
849        use crate::SendDatagramError::*;
850        match state
851            .inner
852            .datagrams()
853            .send(this.data.take().unwrap(), false)
854        {
855            Ok(()) => {
856                state.wake();
857                Poll::Ready(Ok(()))
858            }
859            Err(e) => Poll::Ready(Err(match e {
860                Blocked(data) => {
861                    this.data.replace(data);
862                    loop {
863                        match this.notify.as_mut().poll(ctx) {
864                            Poll::Pending => return Poll::Pending,
865                            // Spurious wakeup, get a new future
866                            Poll::Ready(()) => this
867                                .notify
868                                .set(this.conn.shared.datagrams_unblocked.notified()),
869                        }
870                    }
871                }
872                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
873                Disabled => SendDatagramError::Disabled,
874                TooLarge => SendDatagramError::TooLarge,
875            })),
876        }
877    }
878}
879
880#[derive(Debug)]
881pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
882
883impl ConnectionRef {
884    #[allow(clippy::too_many_arguments)]
885    fn new(
886        handle: ConnectionHandle,
887        conn: crate::Connection,
888        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
889        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
890        on_handshake_data: oneshot::Sender<()>,
891        on_connected: oneshot::Sender<bool>,
892        socket: Arc<dyn AsyncUdpSocket>,
893        runtime: Arc<dyn Runtime>,
894    ) -> Self {
895        Self(Arc::new(ConnectionInner {
896            state: Mutex::new(State {
897                inner: conn,
898                driver: None,
899                handle,
900                on_handshake_data: Some(on_handshake_data),
901                on_connected: Some(on_connected),
902                connected: false,
903                timer: None,
904                timer_deadline: None,
905                conn_events,
906                endpoint_events,
907                blocked_writers: FxHashMap::default(),
908                blocked_readers: FxHashMap::default(),
909                stopped: FxHashMap::default(),
910                error: None,
911                ref_count: 0,
912                io_poller: socket.clone().create_io_poller(),
913                socket,
914                runtime,
915                send_buffer: Vec::new(),
916                buffered_transmit: None,
917            }),
918            shared: Shared::default(),
919        }))
920    }
921
922    fn stable_id(&self) -> usize {
923        &*self.0 as *const _ as usize
924    }
925}
926
927impl Clone for ConnectionRef {
928    fn clone(&self) -> Self {
929        self.state.lock("clone").ref_count += 1;
930        Self(self.0.clone())
931    }
932}
933
934impl Drop for ConnectionRef {
935    fn drop(&mut self) {
936        let conn = &mut *self.state.lock("drop");
937        if let Some(x) = conn.ref_count.checked_sub(1) {
938            conn.ref_count = x;
939            if x == 0 && !conn.inner.is_closed() {
940                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
941                // not, we can't do any harm. If there were any streams being opened, then either
942                // the connection will be closed for an unrelated reason or a fresh reference will
943                // be constructed for the newly opened stream.
944                conn.implicit_close(&self.shared);
945            }
946        }
947    }
948}
949
950impl std::ops::Deref for ConnectionRef {
951    type Target = ConnectionInner;
952    fn deref(&self) -> &Self::Target {
953        &self.0
954    }
955}
956
957#[derive(Debug)]
958pub(crate) struct ConnectionInner {
959    pub(crate) state: Mutex<State>,
960    pub(crate) shared: Shared,
961}
962
963#[derive(Debug, Default)]
964pub(crate) struct Shared {
965    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
966    /// control budget
967    stream_budget_available: [Notify; 2],
968    /// Notified when the peer has initiated a new stream
969    stream_incoming: [Notify; 2],
970    datagram_received: Notify,
971    datagrams_unblocked: Notify,
972    closed: Notify,
973}
974
975pub(crate) struct State {
976    pub(crate) inner: crate::Connection,
977    driver: Option<Waker>,
978    handle: ConnectionHandle,
979    on_handshake_data: Option<oneshot::Sender<()>>,
980    on_connected: Option<oneshot::Sender<bool>>,
981    connected: bool,
982    timer: Option<Pin<Box<dyn AsyncTimer>>>,
983    timer_deadline: Option<Instant>,
984    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
985    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
986    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
987    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
988    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
989    /// Always set to Some before the connection becomes drained
990    pub(crate) error: Option<ConnectionError>,
991    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
992    ref_count: usize,
993    socket: Arc<dyn AsyncUdpSocket>,
994    io_poller: Pin<Box<dyn UdpPoller>>,
995    runtime: Arc<dyn Runtime>,
996    send_buffer: Vec<u8>,
997    /// We buffer a transmit when the underlying I/O would block
998    buffered_transmit: Option<crate::Transmit>,
999}
1000
1001impl State {
1002    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1003        let now = self.runtime.now();
1004        let mut transmits = 0;
1005
1006        let max_datagrams = self
1007            .socket
1008            .max_transmit_segments()
1009            .min(MAX_TRANSMIT_SEGMENTS);
1010
1011        loop {
1012            // Retry the last transmit, or get a new one.
1013            let t = match self.buffered_transmit.take() {
1014                Some(t) => t,
1015                None => {
1016                    self.send_buffer.clear();
1017                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1018                    match self
1019                        .inner
1020                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1021                    {
1022                        Some(t) => {
1023                            transmits += match t.segment_size {
1024                                None => 1,
1025                                Some(s) => t.size.div_ceil(s), // round up
1026                            };
1027                            t
1028                        }
1029                        None => break,
1030                    }
1031                }
1032            };
1033
1034            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1035                // Retry after a future wakeup
1036                self.buffered_transmit = Some(t);
1037                return Ok(false);
1038            }
1039
1040            let len = t.size;
1041            let retry = match self
1042                .socket
1043                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1044            {
1045                Ok(()) => false,
1046                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1047                Err(e) => return Err(e),
1048            };
1049            if retry {
1050                // We thought the socket was writable, but it wasn't. Retry so that either another
1051                // `poll_writable` call determines that the socket is indeed not writable and
1052                // registers us for a wakeup, or the send succeeds if this really was just a
1053                // transient failure.
1054                self.buffered_transmit = Some(t);
1055                continue;
1056            }
1057
1058            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1059                // TODO: What isn't ideal here yet is that if we don't poll all
1060                // datagrams that could be sent we don't go into the `app_limited`
1061                // state and CWND continues to grow until we get here the next time.
1062                // See https://github.com/quinn-rs/quinn/issues/1126
1063                return Ok(true);
1064            }
1065        }
1066
1067        Ok(false)
1068    }
1069
1070    fn forward_endpoint_events(&mut self) {
1071        while let Some(event) = self.inner.poll_endpoint_events() {
1072            // If the endpoint driver is gone, noop.
1073            let _ = self.endpoint_events.send((self.handle, event));
1074        }
1075    }
1076
1077    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1078    fn process_conn_events(
1079        &mut self,
1080        shared: &Shared,
1081        cx: &mut Context,
1082    ) -> Result<(), ConnectionError> {
1083        loop {
1084            match self.conn_events.poll_recv(cx) {
1085                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1086                    self.socket = socket;
1087                    self.io_poller = self.socket.clone().create_io_poller();
1088                    self.inner.local_address_changed();
1089                }
1090                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1091                    self.inner.handle_event(event);
1092                }
1093                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1094                    self.close(error_code, reason, shared);
1095                }
1096                Poll::Ready(None) => {
1097                    return Err(ConnectionError::TransportError(crate::TransportError {
1098                        code: crate::TransportErrorCode::INTERNAL_ERROR,
1099                        frame: None,
1100                        reason: "endpoint driver future was dropped".to_string(),
1101                    }));
1102                }
1103                Poll::Pending => {
1104                    return Ok(());
1105                }
1106            }
1107        }
1108    }
1109
1110    fn forward_app_events(&mut self, shared: &Shared) {
1111        while let Some(event) = self.inner.poll() {
1112            use crate::Event::*;
1113            match event {
1114                HandshakeDataReady => {
1115                    if let Some(x) = self.on_handshake_data.take() {
1116                        let _ = x.send(());
1117                    }
1118                }
1119                Connected => {
1120                    self.connected = true;
1121                    if let Some(x) = self.on_connected.take() {
1122                        // We don't care if the on-connected future was dropped
1123                        let _ = x.send(self.inner.accepted_0rtt());
1124                    }
1125                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1126                        // Wake up rejected 0-RTT streams so they can fail immediately with
1127                        // `ZeroRttRejected` errors.
1128                        wake_all(&mut self.blocked_writers);
1129                        wake_all(&mut self.blocked_readers);
1130                        wake_all_notify(&mut self.stopped);
1131                    }
1132                }
1133                ConnectionLost { reason } => {
1134                    self.terminate(reason, shared);
1135                }
1136                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1137                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1138                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1139                }
1140                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1141                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1142                }
1143                DatagramReceived => {
1144                    shared.datagram_received.notify_waiters();
1145                }
1146                DatagramsUnblocked => {
1147                    shared.datagrams_unblocked.notify_waiters();
1148                }
1149                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1150                Stream(StreamEvent::Available { dir }) => {
1151                    // Might mean any number of streams are ready, so we wake up everyone
1152                    shared.stream_budget_available[dir as usize].notify_waiters();
1153                }
1154                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1155                Stream(StreamEvent::Stopped { id, .. }) => {
1156                    wake_stream_notify(id, &mut self.stopped);
1157                    wake_stream(id, &mut self.blocked_writers);
1158                }
1159            }
1160        }
1161    }
1162
1163    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1164        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1165        // timer is registered with the runtime (and check whether it's already
1166        // expired).
1167        match self.inner.poll_timeout() {
1168            Some(deadline) => {
1169                if let Some(delay) = &mut self.timer {
1170                    // There is no need to reset the tokio timer if the deadline
1171                    // did not change
1172                    if self
1173                        .timer_deadline
1174                        .map(|current_deadline| current_deadline != deadline)
1175                        .unwrap_or(true)
1176                    {
1177                        delay.as_mut().reset(deadline);
1178                    }
1179                } else {
1180                    self.timer = Some(self.runtime.new_timer(deadline));
1181                }
1182                // Store the actual expiration time of the timer
1183                self.timer_deadline = Some(deadline);
1184            }
1185            None => {
1186                self.timer_deadline = None;
1187                return false;
1188            }
1189        }
1190
1191        if self.timer_deadline.is_none() {
1192            return false;
1193        }
1194
1195        let delay = self
1196            .timer
1197            .as_mut()
1198            .expect("timer must exist in this state")
1199            .as_mut();
1200        if delay.poll(cx).is_pending() {
1201            // Since there wasn't a timeout event, there is nothing new
1202            // for the connection to do
1203            return false;
1204        }
1205
1206        // A timer expired, so the caller needs to check for
1207        // new transmits, which might cause new timers to be set.
1208        self.inner.handle_timeout(self.runtime.now());
1209        self.timer_deadline = None;
1210        true
1211    }
1212
1213    /// Wake up a blocked `Driver` task to process I/O
1214    pub(crate) fn wake(&mut self) {
1215        if let Some(x) = self.driver.take() {
1216            x.wake();
1217        }
1218    }
1219
1220    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1221    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1222        self.error = Some(reason.clone());
1223        if let Some(x) = self.on_handshake_data.take() {
1224            let _ = x.send(());
1225        }
1226        wake_all(&mut self.blocked_writers);
1227        wake_all(&mut self.blocked_readers);
1228        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1229        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1230        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1231        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1232        shared.datagram_received.notify_waiters();
1233        shared.datagrams_unblocked.notify_waiters();
1234        if let Some(x) = self.on_connected.take() {
1235            let _ = x.send(false);
1236        }
1237        wake_all_notify(&mut self.stopped);
1238        shared.closed.notify_waiters();
1239    }
1240
1241    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1242        self.inner.close(self.runtime.now(), error_code, reason);
1243        self.terminate(ConnectionError::LocallyClosed, shared);
1244        self.wake();
1245    }
1246
1247    /// Close for a reason other than the application's explicit request
1248    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1249        self.close(0u32.into(), Bytes::new(), shared);
1250    }
1251
1252    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1253        if self.inner.is_handshaking()
1254            || self.inner.accepted_0rtt()
1255            || self.inner.side().is_server()
1256        {
1257            Ok(())
1258        } else {
1259            Err(())
1260        }
1261    }
1262}
1263
1264impl Drop for State {
1265    fn drop(&mut self) {
1266        if !self.inner.is_drained() {
1267            // Ensure the endpoint can tidy up
1268            let _ = self
1269                .endpoint_events
1270                .send((self.handle, crate::EndpointEvent::drained()));
1271        }
1272    }
1273}
1274
1275impl fmt::Debug for State {
1276    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1277        f.debug_struct("State").field("inner", &self.inner).finish()
1278    }
1279}
1280
1281fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1282    if let Some(waker) = wakers.remove(&stream_id) {
1283        waker.wake();
1284    }
1285}
1286
1287fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1288    wakers.drain().for_each(|(_, waker)| waker.wake())
1289}
1290
1291fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1292    if let Some(notify) = wakers.remove(&stream_id) {
1293        notify.notify_waiters()
1294    }
1295}
1296
1297fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1298    wakers
1299        .drain()
1300        .for_each(|(_, notify)| notify.notify_waiters())
1301}
1302
1303/// Errors that can arise when sending a datagram
1304#[derive(Debug, Error, Clone, Eq, PartialEq)]
1305pub enum SendDatagramError {
1306    /// The peer does not support receiving datagram frames
1307    #[error("datagrams not supported by peer")]
1308    UnsupportedByPeer,
1309    /// Datagram support is disabled locally
1310    #[error("datagram support disabled")]
1311    Disabled,
1312    /// The datagram is larger than the connection can currently accommodate
1313    ///
1314    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1315    /// exceeded.
1316    #[error("datagram too large")]
1317    TooLarge,
1318    /// The connection was lost
1319    #[error("connection lost")]
1320    ConnectionLost(#[from] ConnectionError),
1321}
1322
1323/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1324///
1325/// This limits the amount of CPU resources consumed by datagram generation,
1326/// and allows other tasks (like receiving ACKs) to run in between.
1327const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1328
1329/// The maximum amount of datagrams that are sent in a single transmit
1330///
1331/// This can be lower than the maximum platform capabilities, to avoid excessive
1332/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1333/// that numbers around 10 are a good compromise.
1334const MAX_TRANSMIT_SEGMENTS: usize = 10;