ant_quic/high_level/connection.rs
1use std::{
2 any::Any,
3 fmt,
4 future::Future,
5 io,
6 net::{IpAddr, SocketAddr},
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use super::{
20 ConnectionEvent,
21 mutex::Mutex,
22 recv_stream::RecvStream,
23 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24 send_stream::SendStream,
25 udp_transmit,
26};
27use crate::{
28 ConnectionError, ConnectionHandle, ConnectionStats, Dir, Duration, EndpointEvent, Instant,
29 Side, StreamEvent, StreamId, VarInt, congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35 conn: Option<ConnectionRef>,
36 connected: oneshot::Receiver<bool>,
37 handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41 pub(crate) fn new(
42 handle: ConnectionHandle,
43 conn: crate::Connection,
44 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46 socket: Arc<dyn AsyncUdpSocket>,
47 runtime: Arc<dyn Runtime>,
48 ) -> Self {
49 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50 let (on_connected_send, on_connected_recv) = oneshot::channel();
51 let conn = ConnectionRef::new(
52 handle,
53 conn,
54 endpoint_events,
55 conn_events,
56 on_handshake_data_send,
57 on_connected_send,
58 socket,
59 runtime.clone(),
60 );
61
62 let driver = ConnectionDriver(conn.clone());
63 runtime.spawn(Box::pin(
64 async {
65 if let Err(e) = driver.await {
66 tracing::error!("I/O error: {e}");
67 }
68 }
69 .instrument(Span::current()),
70 ));
71
72 Self {
73 conn: Some(conn),
74 connected: on_connected_recv,
75 handshake_data_ready: Some(on_handshake_data_recv),
76 }
77 }
78
79 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80 ///
81 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82 /// If so, the returned [`Connection`] can be used to send application data without waiting for
83 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85 /// complete, at which point subsequently opened streams and written data will have full
86 /// cryptographic protection.
87 ///
88 /// ## Outgoing
89 ///
90 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96 /// If it was rejected, the existence of streams opened and other application data sent prior
97 /// to the handshake completing will not be conveyed to the remote application, and local
98 /// operations on them will return `ZeroRttRejected` errors.
99 ///
100 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101 /// relevant resumption state to be stored in the server, which servers may limit or lose for
102 /// various reasons including not persisting resumption state across server restarts.
103 ///
104 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105 /// implementation's docs for 0-RTT pitfalls.
106 ///
107 /// ## Incoming
108 ///
109 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111 ///
112 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113 /// implementation's docs for 0-RTT pitfalls.
114 ///
115 /// ## Security
116 ///
117 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118 /// replay attacks, and should therefore never invoke non-idempotent operations.
119 ///
120 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121 /// before TLS client authentication has occurred, and should therefore not be used to send
122 /// data for which client authentication is being used.
123 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125 // have to release it explicitly before returning `self` by value.
126 let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129 drop(conn);
130
131 if is_ok {
132 match self.conn.take() {
133 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
134 None => {
135 tracing::error!("Connection state missing during 0-RTT acceptance");
136 Err(self)
137 }
138 }
139 } else {
140 Err(self)
141 }
142 }
143
144 /// Parameters negotiated during the handshake
145 ///
146 /// The dynamic type returned is determined by the configured
147 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
148 /// be [`downcast`](Box::downcast) to a
149 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
150 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
151 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
152 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
153 // simple.
154 if let Some(x) = self.handshake_data_ready.take() {
155 let _ = x.await;
156 }
157 let conn = self.conn.as_ref().ok_or_else(|| {
158 tracing::error!("Connection state missing while retrieving handshake data");
159 ConnectionError::LocallyClosed
160 })?;
161 let inner = conn.state.lock("handshake");
162 inner
163 .inner
164 .crypto_session()
165 .handshake_data()
166 .ok_or_else(|| {
167 inner
168 .error
169 .clone()
170 .expect("spurious handshake data ready notification")
171 })
172 }
173
174 /// The local IP address which was used when the peer established
175 /// the connection
176 ///
177 /// This can be different from the address the endpoint is bound to, in case
178 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
179 ///
180 /// This will return `None` for clients, or when the platform does not expose this
181 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
182 /// supported platforms when using quinn_udp for I/O, which is the default.
183 ///
184 /// Will panic if called after `poll` has returned `Ready`.
185 pub fn local_ip(&self) -> Option<IpAddr> {
186 let conn = self.conn.as_ref().unwrap();
187 let inner = conn.state.lock("local_ip");
188
189 inner.inner.local_ip()
190 }
191
192 /// The peer's UDP address
193 ///
194 /// Will panic if called after `poll` has returned `Ready`.
195 pub fn remote_address(&self) -> SocketAddr {
196 let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
197 conn_ref.state.lock("remote_address").inner.remote_address()
198 }
199}
200
201impl Future for Connecting {
202 type Output = Result<Connection, ConnectionError>;
203 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
204 Pin::new(&mut self.connected).poll(cx).map(|_| {
205 let conn = self.conn.take().unwrap();
206 let inner = conn.state.lock("connecting");
207 if inner.connected {
208 drop(inner);
209 Ok(Connection(conn))
210 } else {
211 Err(inner
212 .error
213 .clone()
214 .expect("connected signaled without connection success or error"))
215 }
216 })
217 }
218}
219
220/// Future that completes when a connection is fully established
221///
222/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
223/// value is meaningless.
224pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
225
226impl Future for ZeroRttAccepted {
227 type Output = bool;
228 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
229 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
230 }
231}
232
233/// A future that drives protocol logic for a connection
234///
235/// This future handles the protocol logic for a single connection, routing events from the
236/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
237/// It also keeps track of outstanding timeouts for the `Connection`.
238///
239/// If the connection encounters an error condition, this future will yield an error. It will
240/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
241/// connection-related futures, this waits for the draining period to complete to ensure that
242/// packets still in flight from the peer are handled gracefully.
243#[must_use = "connection drivers must be spawned for their connections to function"]
244#[derive(Debug)]
245struct ConnectionDriver(ConnectionRef);
246
247impl Future for ConnectionDriver {
248 type Output = Result<(), io::Error>;
249
250 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
251 let conn = &mut *self.0.state.lock("poll");
252
253 let span = debug_span!("drive", id = conn.handle.0);
254 let _guard = span.enter();
255
256 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
257 conn.terminate(e, &self.0.shared);
258 return Poll::Ready(Ok(()));
259 }
260 let mut keep_going = conn.drive_transmit(cx)?;
261 // If a timer expires, there might be more to transmit. When we transmit something, we
262 // might need to reset a timer. Hence, we must loop until neither happens.
263 keep_going |= conn.drive_timer(cx);
264 conn.forward_endpoint_events();
265 conn.forward_app_events(&self.0.shared);
266
267 if !conn.inner.is_drained() {
268 if keep_going {
269 // If the connection hasn't processed all tasks, schedule it again
270 cx.waker().wake_by_ref();
271 } else {
272 conn.driver = Some(cx.waker().clone());
273 }
274 return Poll::Pending;
275 }
276 if conn.error.is_none() {
277 unreachable!("drained connections always have an error");
278 }
279 Poll::Ready(Ok(()))
280 }
281}
282
283/// A QUIC connection.
284///
285/// If all references to a connection (including every clone of the `Connection` handle, streams of
286/// incoming streams, and the various stream types) have been dropped, then the connection will be
287/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
288/// connection explicitly by calling [`Connection::close()`].
289///
290/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
291/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
292/// application. [`Connection::close()`] describes in more detail how to gracefully close a
293/// connection without losing application data.
294///
295/// May be cloned to obtain another handle to the same connection.
296///
297/// [`Connection::close()`]: Connection::close
298#[derive(Debug, Clone)]
299pub struct Connection(ConnectionRef);
300
301impl Connection {
302 /// Initiate a new outgoing unidirectional stream.
303 ///
304 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
305 /// consequence, the peer won't be notified that a stream has been opened until the stream is
306 /// actually used.
307 pub fn open_uni(&self) -> OpenUni<'_> {
308 OpenUni {
309 conn: &self.0,
310 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
311 }
312 }
313
314 /// Initiate a new outgoing bidirectional stream.
315 ///
316 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
317 /// consequence, the peer won't be notified that a stream has been opened until the stream is
318 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
319 /// anything to [`SendStream`] will never succeed.
320 ///
321 /// [`open_bi()`]: Self::open_bi
322 /// [`SendStream`]: crate::SendStream
323 /// [`RecvStream`]: crate::RecvStream
324 pub fn open_bi(&self) -> OpenBi<'_> {
325 OpenBi {
326 conn: &self.0,
327 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
328 }
329 }
330
331 /// Accept the next incoming uni-directional stream
332 pub fn accept_uni(&self) -> AcceptUni<'_> {
333 AcceptUni {
334 conn: &self.0,
335 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
336 }
337 }
338
339 /// Accept the next incoming bidirectional stream
340 ///
341 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
342 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
343 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
344 ///
345 /// [`accept_bi()`]: Self::accept_bi
346 /// [`open_bi()`]: Self::open_bi
347 /// [`SendStream`]: crate::SendStream
348 /// [`RecvStream`]: crate::RecvStream
349 pub fn accept_bi(&self) -> AcceptBi<'_> {
350 AcceptBi {
351 conn: &self.0,
352 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
353 }
354 }
355
356 /// Receive an application datagram
357 pub fn read_datagram(&self) -> ReadDatagram<'_> {
358 ReadDatagram {
359 conn: &self.0,
360 notify: self.0.shared.datagram_received.notified(),
361 }
362 }
363
364 /// Wait for the connection to be closed for any reason
365 ///
366 /// Despite the return type's name, closed connections are often not an error condition at the
367 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
368 /// and [`ConnectionError::ApplicationClosed`].
369 pub async fn closed(&self) -> ConnectionError {
370 {
371 let conn = self.0.state.lock("closed");
372 if let Some(error) = conn.error.as_ref() {
373 return error.clone();
374 }
375 // Construct the future while the lock is held to ensure we can't miss a wakeup if
376 // the `Notify` is signaled immediately after we release the lock. `await` it after
377 // the lock guard is out of scope.
378 self.0.shared.closed.notified()
379 }
380 .await;
381 self.0
382 .state
383 .lock("closed")
384 .error
385 .as_ref()
386 .expect("closed without an error")
387 .clone()
388 }
389
390 /// If the connection is closed, the reason why.
391 ///
392 /// Returns `None` if the connection is still open.
393 pub fn close_reason(&self) -> Option<ConnectionError> {
394 self.0.state.lock("close_reason").error.clone()
395 }
396
397 /// Close the connection immediately.
398 ///
399 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
400 /// more data is sent to the peer and the peer may drop buffered data upon receiving
401 /// the CONNECTION_CLOSE frame.
402 ///
403 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
404 ///
405 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
406 /// is preserved in full, it should be kept under 1KiB.
407 ///
408 /// # Gracefully closing a connection
409 ///
410 /// Only the peer last receiving application data can be certain that all data is
411 /// delivered. The only reliable action it can then take is to close the connection,
412 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
413 /// frame is very likely if both endpoints stay online long enough, and
414 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
415 /// remote peer will time out the connection, provided that the idle timeout is not
416 /// disabled.
417 ///
418 /// The sending side can not guarantee all stream data is delivered to the remote
419 /// application. It only knows the data is delivered to the QUIC stack of the remote
420 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
421 /// [`close()`] the remote endpoint may drop any data it received but is as yet
422 /// undelivered to the application, including data that was acknowledged as received to
423 /// the local endpoint.
424 ///
425 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
426 /// [`Endpoint::wait_idle()`]: crate::high_level::Endpoint::wait_idle
427 /// [`close()`]: Connection::close
428 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
429 let conn = &mut *self.0.state.lock("close");
430 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
431 }
432
433 /// Transmit `data` as an unreliable, unordered application datagram
434 ///
435 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
436 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
437 /// dictated by the peer.
438 ///
439 /// Previously queued datagrams which are still unsent may be discarded to make space for this
440 /// datagram, in order of oldest to newest.
441 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
442 let conn = &mut *self.0.state.lock("send_datagram");
443 if let Some(ref x) = conn.error {
444 return Err(SendDatagramError::ConnectionLost(x.clone()));
445 }
446 use crate::SendDatagramError::*;
447 match conn.inner.datagrams().send(data, true) {
448 Ok(()) => {
449 conn.wake();
450 Ok(())
451 }
452 Err(e) => Err(match e {
453 Blocked(..) => unreachable!(),
454 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
455 Disabled => SendDatagramError::Disabled,
456 TooLarge => SendDatagramError::TooLarge,
457 }),
458 }
459 }
460
461 /// Transmit `data` as an unreliable, unordered application datagram
462 ///
463 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
464 /// conditions, which effectively prioritizes old datagrams over new datagrams.
465 ///
466 /// See [`send_datagram()`] for details.
467 ///
468 /// [`send_datagram()`]: Connection::send_datagram
469 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
470 SendDatagram {
471 conn: &self.0,
472 data: Some(data),
473 notify: self.0.shared.datagrams_unblocked.notified(),
474 }
475 }
476
477 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
478 ///
479 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
480 ///
481 /// This may change over the lifetime of a connection according to variation in the path MTU
482 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
483 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
484 ///
485 /// Not necessarily the maximum size of received datagrams.
486 ///
487 /// [`send_datagram()`]: Connection::send_datagram
488 pub fn max_datagram_size(&self) -> Option<usize> {
489 self.0
490 .state
491 .lock("max_datagram_size")
492 .inner
493 .datagrams()
494 .max_size()
495 }
496
497 /// Bytes available in the outgoing datagram buffer
498 ///
499 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
500 /// at most this size is guaranteed not to cause older datagrams to be dropped.
501 pub fn datagram_send_buffer_space(&self) -> usize {
502 self.0
503 .state
504 .lock("datagram_send_buffer_space")
505 .inner
506 .datagrams()
507 .send_buffer_space()
508 }
509
510 /// The side of the connection (client or server)
511 pub fn side(&self) -> Side {
512 self.0.state.lock("side").inner.side()
513 }
514
515 /// The peer's UDP address
516 ///
517 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
518 /// switching to a cellular internet connection.
519 pub fn remote_address(&self) -> SocketAddr {
520 self.0.state.lock("remote_address").inner.remote_address()
521 }
522
523 /// The local IP address which was used when the peer established
524 /// the connection
525 ///
526 /// This can be different from the address the endpoint is bound to, in case
527 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
528 ///
529 /// This will return `None` for clients, or when the platform does not expose this
530 /// information. See quinn_udp's RecvMeta::dst_ip for a list of
531 /// supported platforms when using quinn_udp for I/O, which is the default.
532 pub fn local_ip(&self) -> Option<IpAddr> {
533 self.0.state.lock("local_ip").inner.local_ip()
534 }
535
536 /// Current best estimate of this connection's latency (round-trip-time)
537 pub fn rtt(&self) -> Duration {
538 self.0.state.lock("rtt").inner.rtt()
539 }
540
541 /// Returns connection statistics
542 pub fn stats(&self) -> ConnectionStats {
543 self.0.state.lock("stats").inner.stats()
544 }
545
546 /// Current state of the congestion control algorithm, for debugging purposes
547 pub fn congestion_state(&self) -> Box<dyn Controller> {
548 self.0
549 .state
550 .lock("congestion_state")
551 .inner
552 .congestion_state()
553 .clone_box()
554 }
555
556 /// Parameters negotiated during the handshake
557 ///
558 /// Guaranteed to return `Some` on fully established connections or after
559 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
560 /// the returned value.
561 ///
562 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
563 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
564 self.0
565 .state
566 .lock("handshake_data")
567 .inner
568 .crypto_session()
569 .handshake_data()
570 }
571
572 /// Cryptographic identity of the peer
573 ///
574 /// The dynamic type returned is determined by the configured
575 /// [`Session`](crate::crypto::Session). For the default `rustls` session, the return value can
576 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
577 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
578 self.0
579 .state
580 .lock("peer_identity")
581 .inner
582 .crypto_session()
583 .peer_identity()
584 }
585
586 /// A stable identifier for this connection
587 ///
588 /// Peer addresses and connection IDs can change, but this value will remain
589 /// fixed for the lifetime of the connection.
590 pub fn stable_id(&self) -> usize {
591 self.0.stable_id()
592 }
593
594 /// Update traffic keys spontaneously
595 ///
596 /// This primarily exists for testing purposes.
597 pub fn force_key_update(&self) {
598 self.0
599 .state
600 .lock("force_key_update")
601 .inner
602 .force_key_update()
603 }
604
605 /// Derive keying material from this connection's TLS session secrets.
606 ///
607 /// When both peers call this method with the same `label` and `context`
608 /// arguments and `output` buffers of equal length, they will get the
609 /// same sequence of bytes in `output`. These bytes are cryptographically
610 /// strong and pseudorandom, and are suitable for use as keying material.
611 ///
612 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
613 pub fn export_keying_material(
614 &self,
615 output: &mut [u8],
616 label: &[u8],
617 context: &[u8],
618 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
619 self.0
620 .state
621 .lock("export_keying_material")
622 .inner
623 .crypto_session()
624 .export_keying_material(output, label, context)
625 }
626
627 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
628 ///
629 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
630 /// `count`s increase both minimum and worst-case memory consumption.
631 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
632 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
633 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
634 // May need to send MAX_STREAMS to make progress
635 conn.wake();
636 }
637
638 /// See [`crate::TransportConfig::receive_window()`]
639 pub fn set_receive_window(&self, receive_window: VarInt) {
640 let mut conn = self.0.state.lock("set_receive_window");
641 conn.inner.set_receive_window(receive_window);
642 conn.wake();
643 }
644
645 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
646 ///
647 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
648 /// `count`s increase both minimum and worst-case memory consumption.
649 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
650 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
651 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
652 // May need to send MAX_STREAMS to make progress
653 conn.wake();
654 }
655
656 /// Set up qlog for this connection.
657 #[cfg(feature = "__qlog")]
658 pub fn set_qlog(
659 &mut self,
660 writer: Box<dyn std::io::Write + Send + Sync>,
661 title: Option<String>,
662 description: Option<String>,
663 ) {
664 let mut state = self.0.state.lock("__qlog");
665 state
666 .inner
667 .set_qlog(writer, title, description, Instant::now());
668 }
669}
670
671pin_project! {
672 /// Future produced by [`Connection::open_uni`]
673 pub struct OpenUni<'a> {
674 conn: &'a ConnectionRef,
675 #[pin]
676 notify: Notified<'a>,
677 }
678}
679
680impl Future for OpenUni<'_> {
681 type Output = Result<SendStream, ConnectionError>;
682 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
683 let this = self.project();
684 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
685 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
686 }
687}
688
689pin_project! {
690 /// Future produced by [`Connection::open_bi`]
691 pub struct OpenBi<'a> {
692 conn: &'a ConnectionRef,
693 #[pin]
694 notify: Notified<'a>,
695 }
696}
697
698impl Future for OpenBi<'_> {
699 type Output = Result<(SendStream, RecvStream), ConnectionError>;
700 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
701 let this = self.project();
702 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
703
704 Poll::Ready(Ok((
705 SendStream::new(conn.clone(), id, is_0rtt),
706 RecvStream::new(conn, id, is_0rtt),
707 )))
708 }
709}
710
711fn poll_open<'a>(
712 ctx: &mut Context<'_>,
713 conn: &'a ConnectionRef,
714 mut notify: Pin<&mut Notified<'a>>,
715 dir: Dir,
716) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
717 let mut state = conn.state.lock("poll_open");
718 if let Some(ref e) = state.error {
719 return Poll::Ready(Err(e.clone()));
720 } else if let Some(id) = state.inner.streams().open(dir) {
721 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
722 drop(state); // Release the lock so clone can take it
723 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
724 }
725 loop {
726 match notify.as_mut().poll(ctx) {
727 // `state` lock ensures we didn't race with readiness
728 Poll::Pending => return Poll::Pending,
729 // Spurious wakeup, get a new future
730 Poll::Ready(()) => {
731 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
732 }
733 }
734 }
735}
736
737pin_project! {
738 /// Future produced by [`Connection::accept_uni`]
739 pub struct AcceptUni<'a> {
740 conn: &'a ConnectionRef,
741 #[pin]
742 notify: Notified<'a>,
743 }
744}
745
746impl Future for AcceptUni<'_> {
747 type Output = Result<RecvStream, ConnectionError>;
748
749 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
750 let this = self.project();
751 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
752 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
753 }
754}
755
756pin_project! {
757 /// Future produced by [`Connection::accept_bi`]
758 pub struct AcceptBi<'a> {
759 conn: &'a ConnectionRef,
760 #[pin]
761 notify: Notified<'a>,
762 }
763}
764
765impl Future for AcceptBi<'_> {
766 type Output = Result<(SendStream, RecvStream), ConnectionError>;
767
768 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
769 let this = self.project();
770 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
771 Poll::Ready(Ok((
772 SendStream::new(conn.clone(), id, is_0rtt),
773 RecvStream::new(conn, id, is_0rtt),
774 )))
775 }
776}
777
778fn poll_accept<'a>(
779 ctx: &mut Context<'_>,
780 conn: &'a ConnectionRef,
781 mut notify: Pin<&mut Notified<'a>>,
782 dir: Dir,
783) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
784 let mut state = conn.state.lock("poll_accept");
785 // Check for incoming streams before checking `state.error` so that already-received streams,
786 // which are necessarily finite, can be drained from a closed connection.
787 if let Some(id) = state.inner.streams().accept(dir) {
788 let is_0rtt = state.inner.is_handshaking();
789 state.wake(); // To send additional stream ID credit
790 drop(state); // Release the lock so clone can take it
791 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
792 } else if let Some(ref e) = state.error {
793 return Poll::Ready(Err(e.clone()));
794 }
795 loop {
796 match notify.as_mut().poll(ctx) {
797 // `state` lock ensures we didn't race with readiness
798 Poll::Pending => return Poll::Pending,
799 // Spurious wakeup, get a new future
800 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
801 }
802 }
803}
804
805pin_project! {
806 /// Future produced by [`Connection::read_datagram`]
807 pub struct ReadDatagram<'a> {
808 conn: &'a ConnectionRef,
809 #[pin]
810 notify: Notified<'a>,
811 }
812}
813
814impl Future for ReadDatagram<'_> {
815 type Output = Result<Bytes, ConnectionError>;
816 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
817 let mut this = self.project();
818 let mut state = this.conn.state.lock("ReadDatagram::poll");
819 // Check for buffered datagrams before checking `state.error` so that already-received
820 // datagrams, which are necessarily finite, can be drained from a closed connection.
821 match state.inner.datagrams().recv() {
822 Some(x) => {
823 return Poll::Ready(Ok(x));
824 }
825 _ => {
826 if let Some(ref e) = state.error {
827 return Poll::Ready(Err(e.clone()));
828 }
829 }
830 }
831 loop {
832 match this.notify.as_mut().poll(ctx) {
833 // `state` lock ensures we didn't race with readiness
834 Poll::Pending => return Poll::Pending,
835 // Spurious wakeup, get a new future
836 Poll::Ready(()) => this
837 .notify
838 .set(this.conn.shared.datagram_received.notified()),
839 }
840 }
841 }
842}
843
844pin_project! {
845 /// Future produced by [`Connection::send_datagram_wait`]
846 pub struct SendDatagram<'a> {
847 conn: &'a ConnectionRef,
848 data: Option<Bytes>,
849 #[pin]
850 notify: Notified<'a>,
851 }
852}
853
854impl Future for SendDatagram<'_> {
855 type Output = Result<(), SendDatagramError>;
856 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
857 let mut this = self.project();
858 let mut state = this.conn.state.lock("SendDatagram::poll");
859 if let Some(ref e) = state.error {
860 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
861 }
862 use crate::SendDatagramError::*;
863 match state
864 .inner
865 .datagrams()
866 .send(this.data.take().unwrap(), false)
867 {
868 Ok(()) => {
869 state.wake();
870 Poll::Ready(Ok(()))
871 }
872 Err(e) => Poll::Ready(Err(match e {
873 Blocked(data) => {
874 this.data.replace(data);
875 loop {
876 match this.notify.as_mut().poll(ctx) {
877 Poll::Pending => return Poll::Pending,
878 // Spurious wakeup, get a new future
879 Poll::Ready(()) => this
880 .notify
881 .set(this.conn.shared.datagrams_unblocked.notified()),
882 }
883 }
884 }
885 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
886 Disabled => SendDatagramError::Disabled,
887 TooLarge => SendDatagramError::TooLarge,
888 })),
889 }
890 }
891}
892
893#[derive(Debug)]
894pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
895
896impl ConnectionRef {
897 #[allow(clippy::too_many_arguments)]
898 fn new(
899 handle: ConnectionHandle,
900 conn: crate::Connection,
901 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
902 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
903 on_handshake_data: oneshot::Sender<()>,
904 on_connected: oneshot::Sender<bool>,
905 socket: Arc<dyn AsyncUdpSocket>,
906 runtime: Arc<dyn Runtime>,
907 ) -> Self {
908 Self(Arc::new(ConnectionInner {
909 state: Mutex::new(State {
910 inner: conn,
911 driver: None,
912 handle,
913 on_handshake_data: Some(on_handshake_data),
914 on_connected: Some(on_connected),
915 connected: false,
916 timer: None,
917 timer_deadline: None,
918 conn_events,
919 endpoint_events,
920 blocked_writers: FxHashMap::default(),
921 blocked_readers: FxHashMap::default(),
922 stopped: FxHashMap::default(),
923 error: None,
924 ref_count: 0,
925 io_poller: socket.clone().create_io_poller(),
926 socket,
927 runtime,
928 send_buffer: Vec::new(),
929 buffered_transmit: None,
930 }),
931 shared: Shared::default(),
932 }))
933 }
934
935 fn stable_id(&self) -> usize {
936 &*self.0 as *const _ as usize
937 }
938}
939
940impl Clone for ConnectionRef {
941 fn clone(&self) -> Self {
942 self.state.lock("clone").ref_count += 1;
943 Self(self.0.clone())
944 }
945}
946
947impl Drop for ConnectionRef {
948 fn drop(&mut self) {
949 let conn = &mut *self.state.lock("drop");
950 if let Some(x) = conn.ref_count.checked_sub(1) {
951 conn.ref_count = x;
952 if x == 0 && !conn.inner.is_closed() {
953 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
954 // not, we can't do any harm. If there were any streams being opened, then either
955 // the connection will be closed for an unrelated reason or a fresh reference will
956 // be constructed for the newly opened stream.
957 conn.implicit_close(&self.shared);
958 }
959 }
960 }
961}
962
963impl std::ops::Deref for ConnectionRef {
964 type Target = ConnectionInner;
965 fn deref(&self) -> &Self::Target {
966 &self.0
967 }
968}
969
970#[derive(Debug)]
971pub(crate) struct ConnectionInner {
972 pub(crate) state: Mutex<State>,
973 pub(crate) shared: Shared,
974}
975
976#[derive(Debug, Default)]
977pub(crate) struct Shared {
978 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
979 /// control budget
980 stream_budget_available: [Notify; 2],
981 /// Notified when the peer has initiated a new stream
982 stream_incoming: [Notify; 2],
983 datagram_received: Notify,
984 datagrams_unblocked: Notify,
985 closed: Notify,
986}
987
988pub(crate) struct State {
989 pub(crate) inner: crate::Connection,
990 driver: Option<Waker>,
991 handle: ConnectionHandle,
992 on_handshake_data: Option<oneshot::Sender<()>>,
993 on_connected: Option<oneshot::Sender<bool>>,
994 connected: bool,
995 timer: Option<Pin<Box<dyn AsyncTimer>>>,
996 timer_deadline: Option<Instant>,
997 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
998 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
999 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1000 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1001 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1002 /// Always set to Some before the connection becomes drained
1003 pub(crate) error: Option<ConnectionError>,
1004 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1005 ref_count: usize,
1006 socket: Arc<dyn AsyncUdpSocket>,
1007 io_poller: Pin<Box<dyn UdpPoller>>,
1008 runtime: Arc<dyn Runtime>,
1009 send_buffer: Vec<u8>,
1010 /// We buffer a transmit when the underlying I/O would block
1011 buffered_transmit: Option<crate::Transmit>,
1012}
1013
1014impl State {
1015 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1016 let now = self.runtime.now();
1017 let mut transmits = 0;
1018
1019 let max_datagrams = self
1020 .socket
1021 .max_transmit_segments()
1022 .min(MAX_TRANSMIT_SEGMENTS);
1023
1024 loop {
1025 // Retry the last transmit, or get a new one.
1026 let t = match self.buffered_transmit.take() {
1027 Some(t) => t,
1028 None => {
1029 self.send_buffer.clear();
1030 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1031 match self
1032 .inner
1033 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1034 {
1035 Some(t) => {
1036 transmits += match t.segment_size {
1037 None => 1,
1038 Some(s) => t.size.div_ceil(s), // round up
1039 };
1040 t
1041 }
1042 None => break,
1043 }
1044 }
1045 };
1046
1047 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1048 // Retry after a future wakeup
1049 self.buffered_transmit = Some(t);
1050 return Ok(false);
1051 }
1052
1053 let len = t.size;
1054 let retry = match self
1055 .socket
1056 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1057 {
1058 Ok(()) => false,
1059 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1060 Err(e) => return Err(e),
1061 };
1062 if retry {
1063 // We thought the socket was writable, but it wasn't. Retry so that either another
1064 // `poll_writable` call determines that the socket is indeed not writable and
1065 // registers us for a wakeup, or the send succeeds if this really was just a
1066 // transient failure.
1067 self.buffered_transmit = Some(t);
1068 continue;
1069 }
1070
1071 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1072 // TODO: What isn't ideal here yet is that if we don't poll all
1073 // datagrams that could be sent we don't go into the `app_limited`
1074 // state and CWND continues to grow until we get here the next time.
1075 // See https://github.com/quinn-rs/quinn/issues/1126
1076 return Ok(true);
1077 }
1078 }
1079
1080 Ok(false)
1081 }
1082
1083 fn forward_endpoint_events(&mut self) {
1084 while let Some(event) = self.inner.poll_endpoint_events() {
1085 // If the endpoint driver is gone, noop.
1086 let _ = self.endpoint_events.send((self.handle, event));
1087 }
1088 }
1089
1090 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1091 fn process_conn_events(
1092 &mut self,
1093 shared: &Shared,
1094 cx: &mut Context,
1095 ) -> Result<(), ConnectionError> {
1096 loop {
1097 match self.conn_events.poll_recv(cx) {
1098 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1099 self.socket = socket;
1100 self.io_poller = self.socket.clone().create_io_poller();
1101 self.inner.local_address_changed();
1102 }
1103 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1104 self.inner.handle_event(event);
1105 }
1106 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1107 self.close(error_code, reason, shared);
1108 }
1109 Poll::Ready(None) => {
1110 return Err(ConnectionError::TransportError(crate::TransportError {
1111 code: crate::TransportErrorCode::INTERNAL_ERROR,
1112 frame: None,
1113 reason: "endpoint driver future was dropped".to_string(),
1114 }));
1115 }
1116 Poll::Pending => {
1117 return Ok(());
1118 }
1119 }
1120 }
1121 }
1122
1123 fn forward_app_events(&mut self, shared: &Shared) {
1124 while let Some(event) = self.inner.poll() {
1125 use crate::Event::*;
1126 match event {
1127 HandshakeDataReady => {
1128 if let Some(x) = self.on_handshake_data.take() {
1129 let _ = x.send(());
1130 }
1131 }
1132 Connected => {
1133 self.connected = true;
1134 if let Some(x) = self.on_connected.take() {
1135 // We don't care if the on-connected future was dropped
1136 let _ = x.send(self.inner.accepted_0rtt());
1137 }
1138 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1139 // Wake up rejected 0-RTT streams so they can fail immediately with
1140 // `ZeroRttRejected` errors.
1141 wake_all(&mut self.blocked_writers);
1142 wake_all(&mut self.blocked_readers);
1143 wake_all_notify(&mut self.stopped);
1144 }
1145 }
1146 ConnectionLost { reason } => {
1147 self.terminate(reason, shared);
1148 }
1149 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1150 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1151 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1152 }
1153 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1154 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1155 }
1156 DatagramReceived => {
1157 shared.datagram_received.notify_waiters();
1158 }
1159 DatagramsUnblocked => {
1160 shared.datagrams_unblocked.notify_waiters();
1161 }
1162 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1163 Stream(StreamEvent::Available { dir }) => {
1164 // Might mean any number of streams are ready, so we wake up everyone
1165 shared.stream_budget_available[dir as usize].notify_waiters();
1166 }
1167 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1168 Stream(StreamEvent::Stopped { id, .. }) => {
1169 wake_stream_notify(id, &mut self.stopped);
1170 wake_stream(id, &mut self.blocked_writers);
1171 }
1172 }
1173 }
1174 }
1175
1176 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1177 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1178 // timer is registered with the runtime (and check whether it's already
1179 // expired).
1180 match self.inner.poll_timeout() {
1181 Some(deadline) => {
1182 if let Some(delay) = &mut self.timer {
1183 // There is no need to reset the tokio timer if the deadline
1184 // did not change
1185 if self
1186 .timer_deadline
1187 .map(|current_deadline| current_deadline != deadline)
1188 .unwrap_or(true)
1189 {
1190 delay.as_mut().reset(deadline);
1191 }
1192 } else {
1193 self.timer = Some(self.runtime.new_timer(deadline));
1194 }
1195 // Store the actual expiration time of the timer
1196 self.timer_deadline = Some(deadline);
1197 }
1198 None => {
1199 self.timer_deadline = None;
1200 return false;
1201 }
1202 }
1203
1204 if self.timer_deadline.is_none() {
1205 return false;
1206 }
1207
1208 let delay = self
1209 .timer
1210 .as_mut()
1211 .expect("timer must exist in this state")
1212 .as_mut();
1213 if delay.poll(cx).is_pending() {
1214 // Since there wasn't a timeout event, there is nothing new
1215 // for the connection to do
1216 return false;
1217 }
1218
1219 // A timer expired, so the caller needs to check for
1220 // new transmits, which might cause new timers to be set.
1221 self.inner.handle_timeout(self.runtime.now());
1222 self.timer_deadline = None;
1223 true
1224 }
1225
1226 /// Wake up a blocked `Driver` task to process I/O
1227 pub(crate) fn wake(&mut self) {
1228 if let Some(x) = self.driver.take() {
1229 x.wake();
1230 }
1231 }
1232
1233 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1234 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1235 self.error = Some(reason.clone());
1236 if let Some(x) = self.on_handshake_data.take() {
1237 let _ = x.send(());
1238 }
1239 wake_all(&mut self.blocked_writers);
1240 wake_all(&mut self.blocked_readers);
1241 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1242 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1243 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1244 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1245 shared.datagram_received.notify_waiters();
1246 shared.datagrams_unblocked.notify_waiters();
1247 if let Some(x) = self.on_connected.take() {
1248 let _ = x.send(false);
1249 }
1250 wake_all_notify(&mut self.stopped);
1251 shared.closed.notify_waiters();
1252 }
1253
1254 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1255 self.inner.close(self.runtime.now(), error_code, reason);
1256 self.terminate(ConnectionError::LocallyClosed, shared);
1257 self.wake();
1258 }
1259
1260 /// Close for a reason other than the application's explicit request
1261 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1262 self.close(0u32.into(), Bytes::new(), shared);
1263 }
1264
1265 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1266 if self.inner.is_handshaking()
1267 || self.inner.accepted_0rtt()
1268 || self.inner.side().is_server()
1269 {
1270 Ok(())
1271 } else {
1272 Err(())
1273 }
1274 }
1275}
1276
1277impl Drop for State {
1278 fn drop(&mut self) {
1279 if !self.inner.is_drained() {
1280 // Ensure the endpoint can tidy up
1281 let _ = self
1282 .endpoint_events
1283 .send((self.handle, crate::EndpointEvent::drained()));
1284 }
1285 }
1286}
1287
1288impl fmt::Debug for State {
1289 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1290 f.debug_struct("State").field("inner", &self.inner).finish()
1291 }
1292}
1293
1294fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1295 if let Some(waker) = wakers.remove(&stream_id) {
1296 waker.wake();
1297 }
1298}
1299
1300fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1301 wakers.drain().for_each(|(_, waker)| waker.wake())
1302}
1303
1304fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1305 if let Some(notify) = wakers.remove(&stream_id) {
1306 notify.notify_waiters()
1307 }
1308}
1309
1310fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1311 wakers
1312 .drain()
1313 .for_each(|(_, notify)| notify.notify_waiters())
1314}
1315
1316/// Errors that can arise when sending a datagram
1317#[derive(Debug, Error, Clone, Eq, PartialEq)]
1318pub enum SendDatagramError {
1319 /// The peer does not support receiving datagram frames
1320 #[error("datagrams not supported by peer")]
1321 UnsupportedByPeer,
1322 /// Datagram support is disabled locally
1323 #[error("datagram support disabled")]
1324 Disabled,
1325 /// The datagram is larger than the connection can currently accommodate
1326 ///
1327 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1328 /// exceeded.
1329 #[error("datagram too large")]
1330 TooLarge,
1331 /// The connection was lost
1332 #[error("connection lost")]
1333 ConnectionLost(#[from] ConnectionError),
1334}
1335
1336/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1337///
1338/// This limits the amount of CPU resources consumed by datagram generation,
1339/// and allows other tasks (like receiving ACKs) to run in between.
1340const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1341
1342/// The maximum amount of datagrams that are sent in a single transmit
1343///
1344/// This can be lower than the maximum platform capabilities, to avoid excessive
1345/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1346/// that numbers around 10 are a good compromise.
1347const MAX_TRANSMIT_SEGMENTS: usize = 10;