1use std::{
9 any::Any,
10 collections::VecDeque,
11 fmt,
12 future::Future,
13 io,
14 net::{IpAddr, SocketAddr},
15 pin::Pin,
16 sync::{Arc, Weak},
17 task::{Context, Poll, Waker, ready},
18};
19
20use bytes::Bytes;
21use pin_project_lite::pin_project;
22use rustc_hash::FxHashMap;
23use thiserror::Error;
24use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
25use tracing::{Instrument, Span, debug, debug_span, error};
26
27use super::{
28 ConnectionEvent,
29 mutex::Mutex,
30 recv_stream::RecvStream,
31 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSender},
32 send_stream::SendStream,
33 udp_transmit,
34};
35use crate::{
36 ConnectionError, ConnectionHandle, ConnectionStats, DatagramDropStats, Dir, Duration,
37 EndpointEvent, Instant, Side, StreamEvent, StreamId, VarInt,
38 congestion::Controller,
39 path::{Path, PathId, PathSnapshot},
40};
41
42#[derive(Debug)]
44pub struct Connecting {
45 conn: Option<ConnectionRef>,
46 connected: oneshot::Receiver<bool>,
47 handshake_data_ready: Option<oneshot::Receiver<()>>,
48}
49
50fn is_expected_background_io_error(error: &io::Error) -> bool {
51 matches!(
52 error.kind(),
53 io::ErrorKind::AddrNotAvailable
54 | io::ErrorKind::ConnectionRefused
55 | io::ErrorKind::ConnectionReset
56 | io::ErrorKind::HostUnreachable
57 | io::ErrorKind::NetworkUnreachable
58 | io::ErrorKind::NotConnected
59 | io::ErrorKind::TimedOut
60 ) || matches!(error.raw_os_error(), Some(49 | 51 | 65 | 101 | 113))
61}
62
63impl Connecting {
64 pub(crate) fn new(
65 handle: ConnectionHandle,
66 conn: crate::Connection,
67 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
68 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
69 socket: Arc<dyn AsyncUdpSocket>,
70 runtime: Arc<dyn Runtime>,
71 ) -> Self {
72 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
73 let (on_connected_send, on_connected_recv) = oneshot::channel();
74 let conn = ConnectionRef::new(
75 handle,
76 conn,
77 endpoint_events,
78 conn_events,
79 on_handshake_data_send,
80 on_connected_send,
81 socket,
82 runtime.clone(),
83 );
84
85 let driver = ConnectionDriver(conn.clone());
86 runtime.spawn(Box::pin(
87 async {
88 if let Err(e) = driver.await {
89 if is_expected_background_io_error(&e) {
90 debug!("background I/O path ended: {e}");
91 } else {
92 tracing::error!("I/O error: {e}");
93 }
94 }
95 }
96 .instrument(Span::current()),
97 ));
98
99 Self {
100 conn: Some(conn),
101 connected: on_connected_recv,
102 handshake_data_ready: Some(on_handshake_data_recv),
103 }
104 }
105
106 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
151 let conn = match self.conn.as_mut() {
154 Some(conn) => conn.state.lock("into_0rtt"),
155 None => {
156 return Err(self);
157 }
158 };
159
160 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
161 drop(conn);
162
163 if is_ok {
164 match self.conn.take() {
165 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
166 None => {
167 tracing::error!("Connection state missing during 0-RTT acceptance");
168 Err(self)
169 }
170 }
171 } else {
172 Err(self)
173 }
174 }
175
176 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
183 if let Some(x) = self.handshake_data_ready.take() {
187 let _ = x.await;
188 }
189 let conn = self.conn.as_ref().ok_or_else(|| {
190 tracing::error!("Connection state missing while retrieving handshake data");
191 ConnectionError::LocallyClosed
192 })?;
193 let inner = conn.state.lock("handshake");
194 inner
195 .inner
196 .crypto_session()
197 .handshake_data()
198 .ok_or_else(|| {
199 inner.error.clone().unwrap_or_else(|| {
200 error!("Spurious handshake data ready notification with no error");
201 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
202 "Spurious handshake notification".to_string(),
203 ))
204 })
205 })
206 }
207
208 pub fn local_ip(&self) -> Option<IpAddr> {
220 let conn = self.conn.as_ref()?;
221 let inner = conn.state.lock("local_ip");
222
223 inner.inner.local_ip()
224 }
225
226 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
230 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
231 error!("Connection used after yielding Ready");
232 ConnectionError::LocallyClosed
233 })?;
234 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
235 }
236}
237
238impl Future for Connecting {
239 type Output = Result<Connection, ConnectionError>;
240 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
241 Pin::new(&mut self.connected).poll(cx).map(|_| {
242 let conn = self.conn.take().ok_or_else(|| {
243 error!("Connection not available when connecting future resolves");
244 ConnectionError::LocallyClosed
245 })?;
246 let inner = conn.state.lock("connecting");
247 if inner.connected {
248 drop(inner);
249 Ok(Connection(conn))
250 } else {
251 Err(inner.error.clone().unwrap_or_else(|| {
252 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
253 "connection failed without error".to_string(),
254 ))
255 }))
256 }
257 })
258 }
259}
260
261pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
266
267impl Future for ZeroRttAccepted {
268 type Output = bool;
269 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
270 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
271 }
272}
273
274#[must_use = "connection drivers must be spawned for their connections to function"]
285#[derive(Debug)]
286struct ConnectionDriver(ConnectionRef);
287
288impl Future for ConnectionDriver {
289 type Output = Result<(), io::Error>;
290
291 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
292 let conn = &mut *self.0.state.lock("poll");
293
294 let span = debug_span!("drive", id = conn.handle.0);
295 let _guard = span.enter();
296
297 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
298 conn.terminate(e, &self.0.shared);
299 return Poll::Ready(Ok(()));
300 }
301 let mut keep_going = conn.drive_transmit(cx)?;
302 keep_going |= conn.drive_timer(cx);
305 conn.forward_endpoint_events();
306 conn.forward_app_events(&self.0.shared);
307
308 if conn.connected && !conn.binding_started {
310 if let Some(rt) = crate::trust::global_runtime() {
311 conn.inner.set_delay_new_token_until_binding(true);
313
314 let hl_conn_server = Connection(self.0.clone());
315 let hl_conn_client = hl_conn_server.clone();
316 let store = rt.store.clone();
317 let policy = rt.policy.clone();
318 let signer = rt.local_secret_key.clone();
319 let spki = rt.local_spki.clone();
320 let runtime = conn.runtime.clone();
321
322 if conn.inner.side().is_server() {
323 runtime.spawn(Box::pin(async move {
324 match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
325 .await
326 {
327 Ok(peer) => {
328 hl_conn_server
329 .0
330 .state
331 .lock("set peer")
332 .inner
333 .set_token_binding_peer_id(peer);
334 hl_conn_server
335 .0
336 .state
337 .lock("allow tokens")
338 .inner
339 .set_delay_new_token_until_binding(false);
340 }
341 Err(_e) => {
342 hl_conn_server.close(0u32.into(), b"channel binding failed");
343 }
344 }
345 }));
346 }
347
348 if conn.inner.side().is_client() {
349 runtime.spawn(Box::pin(async move {
350 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
351 let _ =
352 crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
353 .await;
354 }
355 }));
356 }
357
358 conn.binding_started = true;
359 }
360 }
361
362 if !conn.inner.is_drained() {
363 if keep_going {
364 cx.waker().wake_by_ref();
366 } else {
367 conn.driver = Some(cx.waker().clone());
368 }
369 return Poll::Pending;
370 }
371 if conn.error.is_none() {
372 unreachable!("drained connections always have an error");
373 }
374 Poll::Ready(Ok(()))
375 }
376}
377
378#[derive(Debug, Clone)]
394pub struct Connection(ConnectionRef);
395
396#[derive(Debug, Clone)]
401pub struct WeakConnectionHandle(Weak<ConnectionInner>);
402
403impl WeakConnectionHandle {
404 pub fn is_alive(&self) -> bool {
406 self.0
407 .upgrade()
408 .is_some_and(|inner| inner.state.lock("weak_is_alive").error.is_none())
409 }
410
411 pub fn upgrade(&self) -> Option<Connection> {
413 let conn = ConnectionRef(self.0.upgrade()?);
414 conn.state.lock("weak_upgrade").ref_count += 1;
415 Some(Connection(conn))
416 }
417
418 pub fn is_same_connection(&self, other: &Self) -> bool {
420 self.0.ptr_eq(&other.0)
421 }
422}
423
424impl Connection {
425 pub fn open_uni(&self) -> OpenUni<'_> {
431 OpenUni {
432 conn: &self.0,
433 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
434 }
435 }
436
437 pub fn open_bi(&self) -> OpenBi<'_> {
448 OpenBi {
449 conn: &self.0,
450 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
451 }
452 }
453
454 pub fn accept_uni(&self) -> AcceptUni<'_> {
456 AcceptUni {
457 conn: &self.0,
458 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
459 }
460 }
461
462 pub fn accept_bi(&self) -> AcceptBi<'_> {
473 AcceptBi {
474 conn: &self.0,
475 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
476 }
477 }
478
479 pub fn read_datagram(&self) -> ReadDatagram<'_> {
481 ReadDatagram {
482 conn: &self.0,
483 notify: self.0.shared.datagram_received.notified(),
484 }
485 }
486
487 pub async fn closed(&self) -> ConnectionError {
493 {
494 let conn = self.0.state.lock("closed");
495 if let Some(error) = conn.error.as_ref() {
496 return error.clone();
497 }
498 self.0.shared.closed.notified()
502 }
503 .await;
504 self.0
505 .state
506 .lock("closed")
507 .error
508 .as_ref()
509 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
510 .clone()
511 }
512
513 pub fn on_closed(&self) -> impl Future<Output = ConnectionError> + '_ {
515 self.closed()
516 }
517
518 pub fn weak_handle(&self) -> WeakConnectionHandle {
520 WeakConnectionHandle(Arc::downgrade(&self.0.0))
521 }
522
523 pub fn paths(&self) -> Vec<Path> {
529 self.path_snapshot(PathId::PRIMARY)
530 .map(|snapshot| Path::new(self.weak_handle(), PathId::PRIMARY, snapshot))
531 .into_iter()
532 .collect()
533 }
534
535 pub fn path_stats(&self, id: PathId) -> Option<crate::connection::PathStats> {
537 self.path_snapshot(id).map(|snapshot| snapshot.stats)
538 }
539
540 pub(crate) fn path_snapshot(&self, id: PathId) -> Option<PathSnapshot> {
541 if id != PathId::PRIMARY {
542 return None;
543 }
544
545 let conn = self.0.state.lock("path_snapshot");
546 let stats = conn.inner.stats().path;
547 let remote_address = conn.inner.remote_address();
548 let observed_external_addr = conn.inner.observed_address();
549 Some(PathSnapshot {
550 stats,
551 remote_address,
552 observed_external_addr,
553 })
554 }
555
556 pub fn is_alive(&self) -> bool {
562 self.0.state.lock("is_alive").error.is_none()
563 }
564
565 pub fn close_reason(&self) -> Option<ConnectionError> {
569 self.0.state.lock("close_reason").error.clone()
570 }
571
572 pub(crate) fn supports_ack_receive_v2(&self) -> bool {
573 self.0
574 .state
575 .lock("supports_ack_receive_v2")
576 .inner
577 .supports_ack_receive_v2()
578 }
579
580 pub fn wake_transmit(&self) {
615 self.0.state.lock("wake_transmit").wake();
616 }
617
618 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
620 let conn = &mut *self.0.state.lock("close");
621 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
622 }
623
624 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
633 let conn = &mut *self.0.state.lock("send_datagram");
634 if let Some(ref x) = conn.error {
635 return Err(SendDatagramError::ConnectionLost(x.clone()));
636 }
637 use crate::SendDatagramError::*;
638 match conn.inner.datagrams().send(data, true) {
639 Ok(()) => {
640 conn.wake();
641 Ok(())
642 }
643 Err(e) => Err(match e {
644 Blocked(..) => unreachable!(),
645 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
646 Disabled => SendDatagramError::Disabled,
647 TooLarge => SendDatagramError::TooLarge,
648 }),
649 }
650 }
651
652 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
661 SendDatagram {
662 conn: &self.0,
663 data: Some(data),
664 notify: self.0.shared.datagrams_unblocked.notified(),
665 }
666 }
667
668 pub fn max_datagram_size(&self) -> Option<usize> {
680 self.0
681 .state
682 .lock("max_datagram_size")
683 .inner
684 .datagrams()
685 .max_size()
686 }
687
688 pub fn datagram_send_buffer_space(&self) -> usize {
693 self.0
694 .state
695 .lock("datagram_send_buffer_space")
696 .inner
697 .datagrams()
698 .send_buffer_space()
699 }
700
701 pub fn datagram_drop_stats(&self) -> DatagramDropStats {
703 self.0
704 .state
705 .lock("datagram_drop_stats")
706 .inner
707 .stats()
708 .datagram_drops
709 }
710
711 pub fn on_datagram_drop(&self) -> DatagramDrop<'_> {
713 DatagramDrop {
714 conn: &self.0,
715 notify: self.0.shared.datagram_dropped.notified(),
716 }
717 }
718
719 pub fn send_nat_address_advertisement(
721 &self,
722 address: SocketAddr,
723 priority: u32,
724 ) -> Result<u64, crate::ConnectionError> {
725 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
726 conn.inner.send_nat_address_advertisement(address, priority)
727 }
728
729 pub fn send_nat_address_removal(&self, sequence: u64) -> Result<(), crate::ConnectionError> {
731 let conn = &mut *self.0.state.lock("send_nat_address_removal");
732 conn.inner.send_nat_address_removal(sequence)
733 }
734
735 pub fn send_nat_punch_coordination(
737 &self,
738 paired_with_sequence_number: u64,
739 address: SocketAddr,
740 round: u32,
741 ) -> Result<(), crate::ConnectionError> {
742 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
743 conn.inner
744 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
745 }
746
747 pub fn send_nat_punch_via_relay(
752 &self,
753 target_peer_id: [u8; 32],
754 our_address: SocketAddr,
755 round: u32,
756 ) -> Result<(), crate::ConnectionError> {
757 let conn = &mut *self.0.state.lock("send_nat_punch_via_relay");
758 conn.inner
759 .send_nat_punch_via_relay(target_peer_id, our_address, round)
760 }
761
762 pub fn nat_traversal_supported(&self) -> bool {
764 self.0
765 .state
766 .lock("nat_traversal_supported")
767 .inner
768 .nat_traversal_supported()
769 }
770
771 pub fn nat_traversal_uses_rfc_frame_format(&self) -> bool {
773 self.0
774 .state
775 .lock("nat_traversal_uses_rfc_frame_format")
776 .inner
777 .nat_traversal_uses_rfc_frame_format()
778 }
779
780 pub fn nat_traversal_accepts_legacy_frame_format(&self) -> bool {
782 self.0
783 .state
784 .lock("nat_traversal_accepts_legacy_frame_format")
785 .inner
786 .nat_traversal_accepts_legacy_frame_format()
787 }
788
789 pub fn side(&self) -> Side {
791 self.0.state.lock("side").inner.side()
792 }
793
794 pub fn remote_address(&self) -> SocketAddr {
799 self.0.state.lock("remote_address").inner.remote_address()
800 }
801
802 pub fn observed_address(&self) -> Option<SocketAddr> {
813 self.0
814 .state
815 .lock("observed_address")
816 .inner
817 .observed_address()
818 }
819
820 pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
824 self.0
825 .state
826 .lock("all_observed_addresses")
827 .inner
828 .all_observed_addresses()
829 }
830
831 pub(crate) fn observed_address_updated(&self) -> Notified<'_> {
833 self.0.shared.observed_address_updated.notified()
834 }
835
836 pub fn local_ip(&self) -> Option<IpAddr> {
846 self.0.state.lock("local_ip").inner.local_ip()
847 }
848
849 pub fn rtt(&self) -> Duration {
851 self.0.state.lock("rtt").inner.rtt()
852 }
853
854 pub fn stats(&self) -> ConnectionStats {
856 self.0.state.lock("stats").inner.stats()
857 }
858
859 pub fn congestion_state(&self) -> Box<dyn Controller> {
861 self.0
862 .state
863 .lock("congestion_state")
864 .inner
865 .congestion_state()
866 .clone_box()
867 }
868
869 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
877 self.0
878 .state
879 .lock("handshake_data")
880 .inner
881 .crypto_session()
882 .handshake_data()
883 }
884
885 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
891 self.0
892 .state
893 .lock("peer_identity")
894 .inner
895 .crypto_session()
896 .peer_identity()
897 }
898
899 pub fn stable_id(&self) -> usize {
904 self.0.stable_id()
905 }
906
907 pub fn is_pqc(&self) -> bool {
912 let state = self.0.state.lock("is_pqc");
913 state.inner.is_pqc()
914 }
915
916 pub fn debug_kem_only(&self) -> bool {
920 crate::crypto::rustls::debug_kem_only_enabled()
921 }
922
923 pub fn force_key_update(&self) {
927 self.0
928 .state
929 .lock("force_key_update")
930 .inner
931 .force_key_update()
932 }
933
934 pub fn export_keying_material(
943 &self,
944 output: &mut [u8],
945 label: &[u8],
946 context: &[u8],
947 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
948 self.0
949 .state
950 .lock("export_keying_material")
951 .inner
952 .crypto_session()
953 .export_keying_material(output, label, context)
954 }
955
956 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
961 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
962 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
963 conn.wake();
965 }
966
967 pub fn set_receive_window(&self, receive_window: VarInt) {
969 let mut conn = self.0.state.lock("set_receive_window");
970 conn.inner.set_receive_window(receive_window);
971 conn.wake();
972 }
973
974 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
979 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
980 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
981 conn.wake();
983 }
984
985 #[cfg(feature = "__qlog")]
987 pub fn set_qlog(
988 &mut self,
989 writer: Box<dyn std::io::Write + Send + Sync>,
990 title: Option<String>,
991 description: Option<String>,
992 ) {
993 let mut state = self.0.state.lock("__qlog");
994 state
995 .inner
996 .set_qlog(writer, title, description, Instant::now());
997 }
998}
999
1000pin_project! {
1001 pub struct OpenUni<'a> {
1003 conn: &'a ConnectionRef,
1004 #[pin]
1005 notify: Notified<'a>,
1006 }
1007}
1008
1009impl Future for OpenUni<'_> {
1010 type Output = Result<SendStream, ConnectionError>;
1011 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1012 let this = self.project();
1013 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
1014 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
1015 }
1016}
1017
1018pin_project! {
1019 pub struct OpenBi<'a> {
1021 conn: &'a ConnectionRef,
1022 #[pin]
1023 notify: Notified<'a>,
1024 }
1025}
1026
1027impl Future for OpenBi<'_> {
1028 type Output = Result<(SendStream, RecvStream), ConnectionError>;
1029 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1030 let this = self.project();
1031 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
1032
1033 Poll::Ready(Ok((
1034 SendStream::new(conn.clone(), id, is_0rtt),
1035 RecvStream::new(conn, id, is_0rtt),
1036 )))
1037 }
1038}
1039
1040fn poll_open<'a>(
1041 ctx: &mut Context<'_>,
1042 conn: &'a ConnectionRef,
1043 mut notify: Pin<&mut Notified<'a>>,
1044 dir: Dir,
1045) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1046 let mut state = conn.state.lock("poll_open");
1047 if let Some(ref e) = state.error {
1048 return Poll::Ready(Err(e.clone()));
1049 } else if let Some(id) = state.inner.streams().open(dir) {
1050 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1051 drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1053 }
1054 loop {
1055 match notify.as_mut().poll(ctx) {
1056 Poll::Pending => return Poll::Pending,
1058 Poll::Ready(()) => {
1060 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1061 }
1062 }
1063 }
1064}
1065
1066pin_project! {
1067 pub struct AcceptUni<'a> {
1069 conn: &'a ConnectionRef,
1070 #[pin]
1071 notify: Notified<'a>,
1072 }
1073}
1074
1075impl Future for AcceptUni<'_> {
1076 type Output = Result<RecvStream, ConnectionError>;
1077
1078 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1079 let this = self.project();
1080 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1081 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1082 }
1083}
1084
1085pin_project! {
1086 pub struct AcceptBi<'a> {
1088 conn: &'a ConnectionRef,
1089 #[pin]
1090 notify: Notified<'a>,
1091 }
1092}
1093
1094impl Future for AcceptBi<'_> {
1095 type Output = Result<(SendStream, RecvStream), ConnectionError>;
1096
1097 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1098 let this = self.project();
1099 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1100 Poll::Ready(Ok((
1101 SendStream::new(conn.clone(), id, is_0rtt),
1102 RecvStream::new(conn, id, is_0rtt),
1103 )))
1104 }
1105}
1106
1107fn poll_accept<'a>(
1108 ctx: &mut Context<'_>,
1109 conn: &'a ConnectionRef,
1110 mut notify: Pin<&mut Notified<'a>>,
1111 dir: Dir,
1112) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1113 let mut state = conn.state.lock("poll_accept");
1114 if let Some(id) = state.inner.streams().accept(dir) {
1117 let is_0rtt = state.inner.is_handshaking();
1118 state.wake(); drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1121 } else if let Some(ref e) = state.error {
1122 return Poll::Ready(Err(e.clone()));
1123 }
1124 loop {
1125 match notify.as_mut().poll(ctx) {
1126 Poll::Pending => return Poll::Pending,
1128 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1130 }
1131 }
1132}
1133
1134pin_project! {
1135 pub struct ReadDatagram<'a> {
1137 conn: &'a ConnectionRef,
1138 #[pin]
1139 notify: Notified<'a>,
1140 }
1141}
1142
1143impl Future for ReadDatagram<'_> {
1144 type Output = Result<Bytes, ConnectionError>;
1145 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1146 let mut this = self.project();
1147 let mut state = this.conn.state.lock("ReadDatagram::poll");
1148 match state.inner.datagrams().recv() {
1151 Some(x) => {
1152 return Poll::Ready(Ok(x));
1153 }
1154 _ => {
1155 if let Some(ref e) = state.error {
1156 return Poll::Ready(Err(e.clone()));
1157 }
1158 }
1159 }
1160 loop {
1161 match this.notify.as_mut().poll(ctx) {
1162 Poll::Pending => return Poll::Pending,
1164 Poll::Ready(()) => this
1166 .notify
1167 .set(this.conn.shared.datagram_received.notified()),
1168 }
1169 }
1170 }
1171}
1172
1173pin_project! {
1174 pub struct DatagramDrop<'a> {
1176 conn: &'a ConnectionRef,
1177 #[pin]
1178 notify: Notified<'a>,
1179 }
1180}
1181
1182impl Future for DatagramDrop<'_> {
1183 type Output = Result<DatagramDropStats, ConnectionError>;
1184
1185 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1186 let mut this = self.project();
1187 let mut state = this.conn.state.lock("DatagramDrop::poll");
1188 if let Some(drop) = state.datagram_drop_events.pop_front() {
1189 return Poll::Ready(Ok(drop));
1190 }
1191 if let Some(ref e) = state.error {
1192 return Poll::Ready(Err(e.clone()));
1193 }
1194 loop {
1195 match this.notify.as_mut().poll(ctx) {
1196 Poll::Pending => return Poll::Pending,
1198 Poll::Ready(()) => this
1199 .notify
1200 .set(this.conn.shared.datagram_dropped.notified()),
1201 }
1202 }
1203 }
1204}
1205
1206pin_project! {
1207 pub struct SendDatagram<'a> {
1209 conn: &'a ConnectionRef,
1210 data: Option<Bytes>,
1211 #[pin]
1212 notify: Notified<'a>,
1213 }
1214}
1215
1216impl Future for SendDatagram<'_> {
1217 type Output = Result<(), SendDatagramError>;
1218 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1219 let mut this = self.project();
1220 let mut state = this.conn.state.lock("SendDatagram::poll");
1221 if let Some(ref e) = state.error {
1222 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1223 }
1224 use crate::SendDatagramError::*;
1225 match state.inner.datagrams().send(
1226 this.data.take().ok_or_else(|| {
1227 error!("SendDatagram future polled without data");
1228 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1229 })?,
1230 false,
1231 ) {
1232 Ok(()) => {
1233 state.wake();
1234 Poll::Ready(Ok(()))
1235 }
1236 Err(e) => Poll::Ready(Err(match e {
1237 Blocked(data) => {
1238 this.data.replace(data);
1239 loop {
1240 match this.notify.as_mut().poll(ctx) {
1241 Poll::Pending => return Poll::Pending,
1242 Poll::Ready(()) => this
1244 .notify
1245 .set(this.conn.shared.datagrams_unblocked.notified()),
1246 }
1247 }
1248 }
1249 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1250 Disabled => SendDatagramError::Disabled,
1251 TooLarge => SendDatagramError::TooLarge,
1252 })),
1253 }
1254 }
1255}
1256
1257#[derive(Debug)]
1258pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1259
1260impl ConnectionRef {
1261 #[allow(clippy::too_many_arguments)]
1262 fn new(
1263 handle: ConnectionHandle,
1264 conn: crate::Connection,
1265 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1266 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1267 on_handshake_data: oneshot::Sender<()>,
1268 on_connected: oneshot::Sender<bool>,
1269 socket: Arc<dyn AsyncUdpSocket>,
1270 runtime: Arc<dyn Runtime>,
1271 ) -> Self {
1272 Self(Arc::new(ConnectionInner {
1273 state: Mutex::new(State {
1274 inner: conn,
1275 driver: None,
1276 handle,
1277 on_handshake_data: Some(on_handshake_data),
1278 on_connected: Some(on_connected),
1279 connected: false,
1280 timer: None,
1281 timer_deadline: None,
1282 conn_events,
1283 endpoint_events,
1284 blocked_writers: FxHashMap::default(),
1285 blocked_readers: FxHashMap::default(),
1286 stopped: FxHashMap::default(),
1287 error: None,
1288 ref_count: 0,
1289 datagram_drop_events: VecDeque::new(),
1290 udp_sender: socket.create_sender(),
1291 socket,
1292 runtime,
1293 send_buffer: Vec::new(),
1294 buffered_transmit: None,
1295 binding_started: false,
1296 }),
1297 shared: Shared::default(),
1298 }))
1299 }
1300
1301 fn stable_id(&self) -> usize {
1302 &*self.0 as *const _ as usize
1303 }
1304}
1305
1306impl Clone for ConnectionRef {
1307 fn clone(&self) -> Self {
1308 self.state.lock("clone").ref_count += 1;
1309 Self(self.0.clone())
1310 }
1311}
1312
1313impl Drop for ConnectionRef {
1314 fn drop(&mut self) {
1315 let conn = &mut *self.state.lock("drop");
1316 if let Some(x) = conn.ref_count.checked_sub(1) {
1317 conn.ref_count = x;
1318 if x == 0 && !conn.inner.is_closed() {
1319 conn.implicit_close(&self.shared);
1324 }
1325 }
1326 }
1327}
1328
1329impl std::ops::Deref for ConnectionRef {
1330 type Target = ConnectionInner;
1331 fn deref(&self) -> &Self::Target {
1332 &self.0
1333 }
1334}
1335
1336#[derive(Debug)]
1337pub(crate) struct ConnectionInner {
1338 pub(crate) state: Mutex<State>,
1339 pub(crate) shared: Shared,
1340}
1341
1342#[derive(Debug, Default)]
1343pub(crate) struct Shared {
1344 stream_budget_available: [Notify; 2],
1347 stream_incoming: [Notify; 2],
1349 datagram_received: Notify,
1350 datagrams_unblocked: Notify,
1351 datagram_dropped: Notify,
1352 observed_address_updated: Notify,
1353 closed: Notify,
1354}
1355
1356pub(crate) struct State {
1357 pub(crate) inner: crate::Connection,
1358 driver: Option<Waker>,
1359 handle: ConnectionHandle,
1360 on_handshake_data: Option<oneshot::Sender<()>>,
1361 on_connected: Option<oneshot::Sender<bool>>,
1362 connected: bool,
1363 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1364 timer_deadline: Option<Instant>,
1365 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1366 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1367 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1368 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1369 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1370 pub(crate) error: Option<ConnectionError>,
1372 ref_count: usize,
1374 datagram_drop_events: VecDeque<DatagramDropStats>,
1375 socket: Arc<dyn AsyncUdpSocket>,
1376 udp_sender: Pin<Box<dyn UdpSender>>,
1377 runtime: Arc<dyn Runtime>,
1378 send_buffer: Vec<u8>,
1379 buffered_transmit: Option<crate::Transmit>,
1381 binding_started: bool,
1383}
1384
1385impl State {
1386 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1387 let now = self.runtime.now();
1388 let mut transmits = 0;
1389
1390 let max_datagrams = self
1391 .udp_sender
1392 .max_transmit_segments()
1393 .min(MAX_TRANSMIT_SEGMENTS);
1394
1395 loop {
1396 let t = match self.buffered_transmit.take() {
1398 Some(t) => t,
1399 None => {
1400 self.send_buffer.clear();
1401 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1402 match self
1403 .inner
1404 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1405 {
1406 Some(t) => {
1407 transmits += match t.segment_size {
1408 None => 1,
1409 Some(s) => t.size.div_ceil(s), };
1411 t
1412 }
1413 None => break,
1414 }
1415 }
1416 };
1417
1418 let segment_count = match t.segment_size {
1429 Some(s) if s > 0 => t.size.div_ceil(s),
1430 _ => 1,
1431 };
1432 let is_gso_bundle = segment_count > 1;
1433
1434 let len = t.size;
1435 match self
1436 .udp_sender
1437 .as_mut()
1438 .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1439 {
1440 Poll::Pending => {
1441 self.buffered_transmit = Some(t);
1442 return Ok(false);
1443 }
1444 Poll::Ready(Ok(())) => {
1445 if is_gso_bundle {
1446 crate::diagnostics::gso_diagnostics()
1447 .record_bundle_submitted(segment_count);
1448 }
1449 }
1450 Poll::Ready(Err(e)) => {
1451 if is_gso_bundle {
1457 crate::diagnostics::gso_diagnostics()
1458 .record_bundle_submitted(segment_count);
1459 crate::diagnostics::gso_diagnostics().record_bundle_partial_send();
1460 }
1461 return Err(e);
1462 }
1463 }
1464
1465 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1466 return Ok(true);
1471 }
1472 }
1473
1474 Ok(false)
1475 }
1476
1477 fn forward_endpoint_events(&mut self) {
1478 while let Some(event) = self.inner.poll_endpoint_events() {
1479 let _ = self.endpoint_events.send((self.handle, event));
1481 }
1482 }
1483
1484 fn process_conn_events(
1486 &mut self,
1487 shared: &Shared,
1488 cx: &mut Context,
1489 ) -> Result<(), ConnectionError> {
1490 loop {
1491 match self.conn_events.poll_recv(cx) {
1492 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1493 self.socket = socket;
1494 self.udp_sender = self.socket.create_sender();
1495 self.inner.local_address_changed();
1496 }
1497 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1498 let mut observed_before = self.inner.all_observed_addresses();
1499 observed_before.sort_unstable();
1500 observed_before.dedup();
1501
1502 self.inner.handle_event(event);
1503
1504 let mut observed_after = self.inner.all_observed_addresses();
1505 observed_after.sort_unstable();
1506 observed_after.dedup();
1507 if observed_before != observed_after {
1508 shared.observed_address_updated.notify_one();
1509 }
1510 }
1511 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1512 self.close(error_code, reason, shared);
1513 }
1514 Poll::Ready(None) => {
1515 return Err(ConnectionError::TransportError(crate::TransportError {
1516 code: crate::TransportErrorCode::INTERNAL_ERROR,
1517 frame: None,
1518 reason: "endpoint driver future was dropped".to_string(),
1519 }));
1520 }
1521 Poll::Pending => {
1522 return Ok(());
1523 }
1524 }
1525 }
1526 }
1527
1528 fn forward_app_events(&mut self, shared: &Shared) {
1529 while let Some(event) = self.inner.poll() {
1530 use crate::Event::*;
1531 match event {
1532 HandshakeDataReady => {
1533 if let Some(x) = self.on_handshake_data.take() {
1534 let _ = x.send(());
1535 }
1536 }
1537 Connected => {
1538 self.connected = true;
1539 if let Some(x) = self.on_connected.take() {
1540 let _ = x.send(self.inner.accepted_0rtt());
1542 }
1543 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1544 wake_all(&mut self.blocked_writers);
1547 wake_all(&mut self.blocked_readers);
1548 wake_all_notify(&mut self.stopped);
1549 }
1550 }
1551 ConnectionLost { reason } => {
1552 self.terminate(reason, shared);
1553 }
1554 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1555 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1556 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1557 }
1558 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1559 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1560 }
1561 DatagramReceived => {
1562 shared.datagram_received.notify_waiters();
1563 }
1564 DatagramsUnblocked => {
1565 shared.datagrams_unblocked.notify_waiters();
1566 }
1567 DatagramDropped(drop) => {
1568 tracing::debug!(
1570 datagrams = drop.datagrams,
1571 bytes = drop.bytes,
1572 "datagrams dropped due to receive buffer overflow"
1573 );
1574 self.datagram_drop_events.push_back(drop);
1575 shared.datagram_dropped.notify_waiters();
1576 }
1577 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1578 Stream(StreamEvent::Available { dir }) => {
1579 shared.stream_budget_available[dir as usize].notify_waiters();
1581 }
1582 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1583 Stream(StreamEvent::Stopped { id, .. }) => {
1584 wake_stream_notify(id, &mut self.stopped);
1585 wake_stream(id, &mut self.blocked_writers);
1586 }
1587 }
1588 }
1589 }
1590
1591 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1592 match self.inner.poll_timeout() {
1596 Some(deadline) => {
1597 if let Some(delay) = &mut self.timer {
1598 if self
1601 .timer_deadline
1602 .map(|current_deadline| current_deadline != deadline)
1603 .unwrap_or(true)
1604 {
1605 delay.as_mut().reset(deadline);
1606 }
1607 } else {
1608 self.timer = Some(self.runtime.new_timer(deadline));
1609 }
1610 self.timer_deadline = Some(deadline);
1612 }
1613 None => {
1614 self.timer_deadline = None;
1615 return false;
1616 }
1617 }
1618
1619 if self.timer_deadline.is_none() {
1620 return false;
1621 }
1622
1623 let delay = match self.timer.as_mut() {
1624 Some(timer) => timer.as_mut(),
1625 None => {
1626 error!("Timer missing in state where it should exist");
1627 return false;
1628 }
1629 };
1630 if delay.poll(cx).is_pending() {
1631 return false;
1634 }
1635
1636 self.inner.handle_timeout(self.runtime.now());
1639 self.timer_deadline = None;
1640 true
1641 }
1642
1643 pub(crate) fn wake(&mut self) {
1645 if let Some(x) = self.driver.take() {
1646 x.wake();
1647 }
1648 }
1649
1650 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1652 self.error = Some(reason.clone());
1653 if let Some(x) = self.on_handshake_data.take() {
1654 let _ = x.send(());
1655 }
1656 wake_all(&mut self.blocked_writers);
1657 wake_all(&mut self.blocked_readers);
1658 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1659 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1660 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1661 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1662 shared.datagram_received.notify_waiters();
1663 shared.datagrams_unblocked.notify_waiters();
1664 shared.datagram_dropped.notify_waiters();
1665 shared.observed_address_updated.notify_waiters();
1666 if let Some(x) = self.on_connected.take() {
1667 let _ = x.send(false);
1668 }
1669 wake_all_notify(&mut self.stopped);
1670 shared.closed.notify_waiters();
1671 }
1672
1673 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1674 self.inner.close(self.runtime.now(), error_code, reason);
1675 self.terminate(ConnectionError::LocallyClosed, shared);
1676 self.wake();
1677 }
1678
1679 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1681 self.close(0u32.into(), Bytes::new(), shared);
1682 }
1683
1684 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1685 if self.inner.is_handshaking()
1686 || self.inner.accepted_0rtt()
1687 || self.inner.side().is_server()
1688 {
1689 Ok(())
1690 } else {
1691 Err(())
1692 }
1693 }
1694}
1695
1696impl Drop for State {
1697 fn drop(&mut self) {
1698 if !self.inner.is_drained() {
1699 let _ = self
1701 .endpoint_events
1702 .send((self.handle, crate::EndpointEvent::drained()));
1703 }
1704 }
1705}
1706
1707impl fmt::Debug for State {
1708 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1709 f.debug_struct("State").field("inner", &self.inner).finish()
1710 }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715 use std::sync::Arc;
1716
1717 use rustls::pki_types::{CertificateDer, PrivateKeyDer};
1718 use tokio::time::{Duration, timeout};
1719
1720 use crate::config::{ClientConfig, ServerConfig};
1721 use crate::high_level::Endpoint;
1722
1723 fn gen_self_signed_cert() -> (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>) {
1724 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
1725 .expect("generate self-signed cert");
1726 let cert_der = CertificateDer::from(cert.cert);
1727 let key_der = PrivateKeyDer::Pkcs8(cert.signing_key.serialize_der().into());
1728 (vec![cert_der], key_der)
1729 }
1730
1731 fn client_config(chain: &[CertificateDer<'static>]) -> ClientConfig {
1732 let mut roots = rustls::RootCertStore::empty();
1733 for cert in chain.iter().cloned() {
1734 roots.add(cert).expect("add root");
1735 }
1736 ClientConfig::with_root_certificates(Arc::new(roots)).expect("client config")
1737 }
1738
1739 #[tokio::test]
1740 async fn weak_connection_handle_does_not_keep_connection_alive() {
1741 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1742
1743 let (chain, key) = gen_self_signed_cert();
1744 let server_config =
1745 ServerConfig::with_single_cert(chain.clone(), key).expect("server config");
1746 let server =
1747 Endpoint::server(server_config, ([127, 0, 0, 1], 0).into()).expect("server endpoint");
1748 let server_addr = server.local_addr().expect("server local addr");
1749
1750 let server_task = tokio::spawn(async move {
1751 let incoming = timeout(Duration::from_secs(10), server.accept())
1752 .await
1753 .expect("server accept wait")
1754 .expect("incoming connection");
1755 let conn = timeout(Duration::from_secs(10), incoming)
1756 .await
1757 .expect("server handshake wait")
1758 .expect("server handshake");
1759 let _ = timeout(Duration::from_secs(10), conn.closed()).await;
1760 });
1761
1762 let mut client = Endpoint::client(([127, 0, 0, 1], 0).into()).expect("client endpoint");
1763 client.set_default_client_config(client_config(&chain));
1764 let connecting = client
1765 .connect(server_addr, "localhost")
1766 .expect("connect start");
1767 let conn = timeout(Duration::from_secs(10), connecting)
1768 .await
1769 .expect("client handshake wait")
1770 .expect("client handshake");
1771
1772 let weak = conn.weak_handle();
1773 assert!(weak.is_alive());
1774
1775 let upgraded = weak.upgrade().expect("weak upgrade while live");
1776 assert!(weak.is_same_connection(&upgraded.weak_handle()));
1777 drop(upgraded);
1778
1779 conn.close(0u32.into(), b"x0x-0045");
1780 let _ = timeout(Duration::from_secs(10), conn.on_closed())
1781 .await
1782 .expect("client closed");
1783 assert!(!weak.is_alive());
1784
1785 drop(conn);
1786 drop(client);
1787
1788 timeout(Duration::from_secs(10), async {
1789 while weak.upgrade().is_some() {
1790 tokio::time::sleep(Duration::from_millis(10)).await;
1791 }
1792 })
1793 .await
1794 .expect("weak handle released after strong handles dropped");
1795
1796 server_task.await.expect("server task");
1797 }
1798}
1799
1800fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1801 if let Some(waker) = wakers.remove(&stream_id) {
1802 waker.wake();
1803 }
1804}
1805
1806fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1807 wakers.drain().for_each(|(_, waker)| waker.wake())
1808}
1809
1810fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1811 if let Some(notify) = wakers.remove(&stream_id) {
1812 notify.notify_waiters()
1813 }
1814}
1815
1816fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1817 wakers
1818 .drain()
1819 .for_each(|(_, notify)| notify.notify_waiters())
1820}
1821
1822#[derive(Debug, Error, Clone, Eq, PartialEq)]
1824pub enum SendDatagramError {
1825 #[error("datagrams not supported by peer")]
1827 UnsupportedByPeer,
1828 #[error("datagram support disabled")]
1830 Disabled,
1831 #[error("datagram too large")]
1836 TooLarge,
1837 #[error("connection lost")]
1839 ConnectionLost(#[from] ConnectionError),
1840}
1841
1842const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1847
1848const MAX_TRANSMIT_SEGMENTS: usize = 10;