1use std::{
9 any::Any,
10 collections::VecDeque,
11 fmt,
12 future::Future,
13 io,
14 net::{IpAddr, SocketAddr},
15 pin::Pin,
16 sync::Arc,
17 task::{Context, Poll, Waker, ready},
18};
19
20use bytes::Bytes;
21use pin_project_lite::pin_project;
22use rustc_hash::FxHashMap;
23use thiserror::Error;
24use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
25use tracing::{Instrument, Span, debug, debug_span, error};
26
27use super::{
28 ConnectionEvent,
29 mutex::Mutex,
30 recv_stream::RecvStream,
31 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
32 send_stream::SendStream,
33 udp_transmit,
34};
35use crate::{
36 ConnectionError, ConnectionHandle, ConnectionStats, DatagramDropStats, Dir, Duration,
37 EndpointEvent, Instant, Side, StreamEvent, StreamId, VarInt, congestion::Controller,
38};
39
40#[derive(Debug)]
42pub struct Connecting {
43 conn: Option<ConnectionRef>,
44 connected: oneshot::Receiver<bool>,
45 handshake_data_ready: Option<oneshot::Receiver<()>>,
46}
47
48fn is_expected_background_io_error(error: &io::Error) -> bool {
49 matches!(
50 error.kind(),
51 io::ErrorKind::AddrNotAvailable
52 | io::ErrorKind::ConnectionRefused
53 | io::ErrorKind::ConnectionReset
54 | io::ErrorKind::HostUnreachable
55 | io::ErrorKind::NetworkUnreachable
56 | io::ErrorKind::NotConnected
57 | io::ErrorKind::TimedOut
58 ) || matches!(error.raw_os_error(), Some(49 | 51 | 65 | 101 | 113))
59}
60
61impl Connecting {
62 pub(crate) fn new(
63 handle: ConnectionHandle,
64 conn: crate::Connection,
65 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
66 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
67 socket: Arc<dyn AsyncUdpSocket>,
68 runtime: Arc<dyn Runtime>,
69 ) -> Self {
70 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
71 let (on_connected_send, on_connected_recv) = oneshot::channel();
72 let conn = ConnectionRef::new(
73 handle,
74 conn,
75 endpoint_events,
76 conn_events,
77 on_handshake_data_send,
78 on_connected_send,
79 socket,
80 runtime.clone(),
81 );
82
83 let driver = ConnectionDriver(conn.clone());
84 runtime.spawn(Box::pin(
85 async {
86 if let Err(e) = driver.await {
87 if is_expected_background_io_error(&e) {
88 debug!("background I/O path ended: {e}");
89 } else {
90 tracing::error!("I/O error: {e}");
91 }
92 }
93 }
94 .instrument(Span::current()),
95 ));
96
97 Self {
98 conn: Some(conn),
99 connected: on_connected_recv,
100 handshake_data_ready: Some(on_handshake_data_recv),
101 }
102 }
103
104 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
149 let conn = match self.conn.as_mut() {
152 Some(conn) => conn.state.lock("into_0rtt"),
153 None => {
154 return Err(self);
155 }
156 };
157
158 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
159 drop(conn);
160
161 if is_ok {
162 match self.conn.take() {
163 Some(conn) => Ok((Connection(conn), ZeroRttAccepted(self.connected))),
164 None => {
165 tracing::error!("Connection state missing during 0-RTT acceptance");
166 Err(self)
167 }
168 }
169 } else {
170 Err(self)
171 }
172 }
173
174 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
181 if let Some(x) = self.handshake_data_ready.take() {
185 let _ = x.await;
186 }
187 let conn = self.conn.as_ref().ok_or_else(|| {
188 tracing::error!("Connection state missing while retrieving handshake data");
189 ConnectionError::LocallyClosed
190 })?;
191 let inner = conn.state.lock("handshake");
192 inner
193 .inner
194 .crypto_session()
195 .handshake_data()
196 .ok_or_else(|| {
197 inner.error.clone().unwrap_or_else(|| {
198 error!("Spurious handshake data ready notification with no error");
199 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
200 "Spurious handshake notification".to_string(),
201 ))
202 })
203 })
204 }
205
206 pub fn local_ip(&self) -> Option<IpAddr> {
218 let conn = self.conn.as_ref()?;
219 let inner = conn.state.lock("local_ip");
220
221 inner.inner.local_ip()
222 }
223
224 pub fn remote_address(&self) -> Result<SocketAddr, ConnectionError> {
228 let conn_ref: &ConnectionRef = self.conn.as_ref().ok_or_else(|| {
229 error!("Connection used after yielding Ready");
230 ConnectionError::LocallyClosed
231 })?;
232 Ok(conn_ref.state.lock("remote_address").inner.remote_address())
233 }
234}
235
236impl Future for Connecting {
237 type Output = Result<Connection, ConnectionError>;
238 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
239 Pin::new(&mut self.connected).poll(cx).map(|_| {
240 let conn = self.conn.take().ok_or_else(|| {
241 error!("Connection not available when connecting future resolves");
242 ConnectionError::LocallyClosed
243 })?;
244 let inner = conn.state.lock("connecting");
245 if inner.connected {
246 drop(inner);
247 Ok(Connection(conn))
248 } else {
249 Err(inner.error.clone().unwrap_or_else(|| {
250 ConnectionError::TransportError(crate::transport_error::Error::INTERNAL_ERROR(
251 "connection failed without error".to_string(),
252 ))
253 }))
254 }
255 })
256 }
257}
258
259pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
264
265impl Future for ZeroRttAccepted {
266 type Output = bool;
267 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
268 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
269 }
270}
271
272#[must_use = "connection drivers must be spawned for their connections to function"]
283#[derive(Debug)]
284struct ConnectionDriver(ConnectionRef);
285
286impl Future for ConnectionDriver {
287 type Output = Result<(), io::Error>;
288
289 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
290 let conn = &mut *self.0.state.lock("poll");
291
292 let span = debug_span!("drive", id = conn.handle.0);
293 let _guard = span.enter();
294
295 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
296 conn.terminate(e, &self.0.shared);
297 return Poll::Ready(Ok(()));
298 }
299 let mut keep_going = conn.drive_transmit(cx)?;
300 keep_going |= conn.drive_timer(cx);
303 conn.forward_endpoint_events();
304 conn.forward_app_events(&self.0.shared);
305
306 if conn.connected && !conn.binding_started {
308 if let Some(rt) = crate::trust::global_runtime() {
309 conn.inner.set_delay_new_token_until_binding(true);
311
312 let hl_conn_server = Connection(self.0.clone());
313 let hl_conn_client = hl_conn_server.clone();
314 let store = rt.store.clone();
315 let policy = rt.policy.clone();
316 let signer = rt.local_secret_key.clone();
317 let spki = rt.local_spki.clone();
318 let runtime = conn.runtime.clone();
319
320 if conn.inner.side().is_server() {
321 runtime.spawn(Box::pin(async move {
322 match crate::trust::recv_verify_binding(&hl_conn_server, &*store, &policy)
323 .await
324 {
325 Ok(peer) => {
326 hl_conn_server
327 .0
328 .state
329 .lock("set peer")
330 .inner
331 .set_token_binding_peer_id(peer);
332 hl_conn_server
333 .0
334 .state
335 .lock("allow tokens")
336 .inner
337 .set_delay_new_token_until_binding(false);
338 }
339 Err(_e) => {
340 hl_conn_server.close(0u32.into(), b"channel binding failed");
341 }
342 }
343 }));
344 }
345
346 if conn.inner.side().is_client() {
347 runtime.spawn(Box::pin(async move {
348 if let Ok(exp) = crate::trust::derive_exporter(&hl_conn_client) {
349 let _ =
350 crate::trust::send_binding(&hl_conn_client, &exp, &signer, &spki)
351 .await;
352 }
353 }));
354 }
355
356 conn.binding_started = true;
357 }
358 }
359
360 if !conn.inner.is_drained() {
361 if keep_going {
362 cx.waker().wake_by_ref();
364 } else {
365 conn.driver = Some(cx.waker().clone());
366 }
367 return Poll::Pending;
368 }
369 if conn.error.is_none() {
370 unreachable!("drained connections always have an error");
371 }
372 Poll::Ready(Ok(()))
373 }
374}
375
376#[derive(Debug, Clone)]
392pub struct Connection(ConnectionRef);
393
394impl Connection {
395 pub fn open_uni(&self) -> OpenUni<'_> {
401 OpenUni {
402 conn: &self.0,
403 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
404 }
405 }
406
407 pub fn open_bi(&self) -> OpenBi<'_> {
418 OpenBi {
419 conn: &self.0,
420 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
421 }
422 }
423
424 pub fn accept_uni(&self) -> AcceptUni<'_> {
426 AcceptUni {
427 conn: &self.0,
428 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
429 }
430 }
431
432 pub fn accept_bi(&self) -> AcceptBi<'_> {
443 AcceptBi {
444 conn: &self.0,
445 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
446 }
447 }
448
449 pub fn read_datagram(&self) -> ReadDatagram<'_> {
451 ReadDatagram {
452 conn: &self.0,
453 notify: self.0.shared.datagram_received.notified(),
454 }
455 }
456
457 pub async fn closed(&self) -> ConnectionError {
463 {
464 let conn = self.0.state.lock("closed");
465 if let Some(error) = conn.error.as_ref() {
466 return error.clone();
467 }
468 self.0.shared.closed.notified()
472 }
473 .await;
474 self.0
475 .state
476 .lock("closed")
477 .error
478 .as_ref()
479 .unwrap_or_else(|| &crate::connection::ConnectionError::LocallyClosed)
480 .clone()
481 }
482
483 pub fn is_alive(&self) -> bool {
489 self.0.state.lock("is_alive").error.is_none()
490 }
491
492 pub fn close_reason(&self) -> Option<ConnectionError> {
496 self.0.state.lock("close_reason").error.clone()
497 }
498
499 pub(crate) fn supports_ack_receive_v1(&self) -> bool {
500 self.0
501 .state
502 .lock("supports_ack_receive_v1")
503 .inner
504 .supports_ack_receive_v1()
505 }
506
507 pub fn wake_transmit(&self) {
542 self.0.state.lock("wake_transmit").wake();
543 }
544
545 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
547 let conn = &mut *self.0.state.lock("close");
548 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
549 }
550
551 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
560 let conn = &mut *self.0.state.lock("send_datagram");
561 if let Some(ref x) = conn.error {
562 return Err(SendDatagramError::ConnectionLost(x.clone()));
563 }
564 use crate::SendDatagramError::*;
565 match conn.inner.datagrams().send(data, true) {
566 Ok(()) => {
567 conn.wake();
568 Ok(())
569 }
570 Err(e) => Err(match e {
571 Blocked(..) => unreachable!(),
572 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
573 Disabled => SendDatagramError::Disabled,
574 TooLarge => SendDatagramError::TooLarge,
575 }),
576 }
577 }
578
579 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
588 SendDatagram {
589 conn: &self.0,
590 data: Some(data),
591 notify: self.0.shared.datagrams_unblocked.notified(),
592 }
593 }
594
595 pub fn max_datagram_size(&self) -> Option<usize> {
607 self.0
608 .state
609 .lock("max_datagram_size")
610 .inner
611 .datagrams()
612 .max_size()
613 }
614
615 pub fn datagram_send_buffer_space(&self) -> usize {
620 self.0
621 .state
622 .lock("datagram_send_buffer_space")
623 .inner
624 .datagrams()
625 .send_buffer_space()
626 }
627
628 pub fn datagram_drop_stats(&self) -> DatagramDropStats {
630 self.0
631 .state
632 .lock("datagram_drop_stats")
633 .inner
634 .stats()
635 .datagram_drops
636 }
637
638 pub fn on_datagram_drop(&self) -> DatagramDrop<'_> {
640 DatagramDrop {
641 conn: &self.0,
642 notify: self.0.shared.datagram_dropped.notified(),
643 }
644 }
645
646 pub fn send_nat_address_advertisement(
648 &self,
649 address: SocketAddr,
650 priority: u32,
651 ) -> Result<u64, crate::ConnectionError> {
652 let conn = &mut *self.0.state.lock("send_nat_address_advertisement");
653 conn.inner.send_nat_address_advertisement(address, priority)
654 }
655
656 pub fn send_nat_punch_coordination(
658 &self,
659 paired_with_sequence_number: u64,
660 address: SocketAddr,
661 round: u32,
662 ) -> Result<(), crate::ConnectionError> {
663 let conn = &mut *self.0.state.lock("send_nat_punch_coordination");
664 conn.inner
665 .send_nat_punch_coordination(paired_with_sequence_number, address, round)
666 }
667
668 pub fn send_nat_punch_via_relay(
673 &self,
674 target_peer_id: [u8; 32],
675 our_address: SocketAddr,
676 round: u32,
677 ) -> Result<(), crate::ConnectionError> {
678 let conn = &mut *self.0.state.lock("send_nat_punch_via_relay");
679 conn.inner
680 .send_nat_punch_via_relay(target_peer_id, our_address, round)
681 }
682
683 pub fn side(&self) -> Side {
685 self.0.state.lock("side").inner.side()
686 }
687
688 pub fn remote_address(&self) -> SocketAddr {
693 self.0.state.lock("remote_address").inner.remote_address()
694 }
695
696 pub fn observed_address(&self) -> Option<SocketAddr> {
707 self.0
708 .state
709 .lock("observed_address")
710 .inner
711 .observed_address()
712 }
713
714 pub fn all_observed_addresses(&self) -> Vec<SocketAddr> {
718 self.0
719 .state
720 .lock("all_observed_addresses")
721 .inner
722 .all_observed_addresses()
723 }
724
725 pub(crate) fn observed_address_updated(&self) -> Notified<'_> {
727 self.0.shared.observed_address_updated.notified()
728 }
729
730 pub fn local_ip(&self) -> Option<IpAddr> {
740 self.0.state.lock("local_ip").inner.local_ip()
741 }
742
743 pub fn rtt(&self) -> Duration {
745 self.0.state.lock("rtt").inner.rtt()
746 }
747
748 pub fn stats(&self) -> ConnectionStats {
750 self.0.state.lock("stats").inner.stats()
751 }
752
753 pub fn congestion_state(&self) -> Box<dyn Controller> {
755 self.0
756 .state
757 .lock("congestion_state")
758 .inner
759 .congestion_state()
760 .clone_box()
761 }
762
763 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
771 self.0
772 .state
773 .lock("handshake_data")
774 .inner
775 .crypto_session()
776 .handshake_data()
777 }
778
779 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
785 self.0
786 .state
787 .lock("peer_identity")
788 .inner
789 .crypto_session()
790 .peer_identity()
791 }
792
793 pub fn stable_id(&self) -> usize {
798 self.0.stable_id()
799 }
800
801 pub fn is_pqc(&self) -> bool {
806 let state = self.0.state.lock("is_pqc");
807 state.inner.is_pqc()
808 }
809
810 pub fn debug_kem_only(&self) -> bool {
814 crate::crypto::rustls::debug_kem_only_enabled()
815 }
816
817 pub fn force_key_update(&self) {
821 self.0
822 .state
823 .lock("force_key_update")
824 .inner
825 .force_key_update()
826 }
827
828 pub fn export_keying_material(
837 &self,
838 output: &mut [u8],
839 label: &[u8],
840 context: &[u8],
841 ) -> Result<(), crate::crypto::ExportKeyingMaterialError> {
842 self.0
843 .state
844 .lock("export_keying_material")
845 .inner
846 .crypto_session()
847 .export_keying_material(output, label, context)
848 }
849
850 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
855 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
856 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
857 conn.wake();
859 }
860
861 pub fn set_receive_window(&self, receive_window: VarInt) {
863 let mut conn = self.0.state.lock("set_receive_window");
864 conn.inner.set_receive_window(receive_window);
865 conn.wake();
866 }
867
868 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
873 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
874 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
875 conn.wake();
877 }
878
879 #[cfg(feature = "__qlog")]
881 pub fn set_qlog(
882 &mut self,
883 writer: Box<dyn std::io::Write + Send + Sync>,
884 title: Option<String>,
885 description: Option<String>,
886 ) {
887 let mut state = self.0.state.lock("__qlog");
888 state
889 .inner
890 .set_qlog(writer, title, description, Instant::now());
891 }
892}
893
894pin_project! {
895 pub struct OpenUni<'a> {
897 conn: &'a ConnectionRef,
898 #[pin]
899 notify: Notified<'a>,
900 }
901}
902
903impl Future for OpenUni<'_> {
904 type Output = Result<SendStream, ConnectionError>;
905 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
906 let this = self.project();
907 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
908 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
909 }
910}
911
912pin_project! {
913 pub struct OpenBi<'a> {
915 conn: &'a ConnectionRef,
916 #[pin]
917 notify: Notified<'a>,
918 }
919}
920
921impl Future for OpenBi<'_> {
922 type Output = Result<(SendStream, RecvStream), ConnectionError>;
923 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
924 let this = self.project();
925 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
926
927 Poll::Ready(Ok((
928 SendStream::new(conn.clone(), id, is_0rtt),
929 RecvStream::new(conn, id, is_0rtt),
930 )))
931 }
932}
933
934fn poll_open<'a>(
935 ctx: &mut Context<'_>,
936 conn: &'a ConnectionRef,
937 mut notify: Pin<&mut Notified<'a>>,
938 dir: Dir,
939) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
940 let mut state = conn.state.lock("poll_open");
941 if let Some(ref e) = state.error {
942 return Poll::Ready(Err(e.clone()));
943 } else if let Some(id) = state.inner.streams().open(dir) {
944 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
945 drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
947 }
948 loop {
949 match notify.as_mut().poll(ctx) {
950 Poll::Pending => return Poll::Pending,
952 Poll::Ready(()) => {
954 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
955 }
956 }
957 }
958}
959
960pin_project! {
961 pub struct AcceptUni<'a> {
963 conn: &'a ConnectionRef,
964 #[pin]
965 notify: Notified<'a>,
966 }
967}
968
969impl Future for AcceptUni<'_> {
970 type Output = Result<RecvStream, ConnectionError>;
971
972 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
973 let this = self.project();
974 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
975 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
976 }
977}
978
979pin_project! {
980 pub struct AcceptBi<'a> {
982 conn: &'a ConnectionRef,
983 #[pin]
984 notify: Notified<'a>,
985 }
986}
987
988impl Future for AcceptBi<'_> {
989 type Output = Result<(SendStream, RecvStream), ConnectionError>;
990
991 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
992 let this = self.project();
993 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
994 Poll::Ready(Ok((
995 SendStream::new(conn.clone(), id, is_0rtt),
996 RecvStream::new(conn, id, is_0rtt),
997 )))
998 }
999}
1000
1001fn poll_accept<'a>(
1002 ctx: &mut Context<'_>,
1003 conn: &'a ConnectionRef,
1004 mut notify: Pin<&mut Notified<'a>>,
1005 dir: Dir,
1006) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1007 let mut state = conn.state.lock("poll_accept");
1008 if let Some(id) = state.inner.streams().accept(dir) {
1011 let is_0rtt = state.inner.is_handshaking();
1012 state.wake(); drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1015 } else if let Some(ref e) = state.error {
1016 return Poll::Ready(Err(e.clone()));
1017 }
1018 loop {
1019 match notify.as_mut().poll(ctx) {
1020 Poll::Pending => return Poll::Pending,
1022 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1024 }
1025 }
1026}
1027
1028pin_project! {
1029 pub struct ReadDatagram<'a> {
1031 conn: &'a ConnectionRef,
1032 #[pin]
1033 notify: Notified<'a>,
1034 }
1035}
1036
1037impl Future for ReadDatagram<'_> {
1038 type Output = Result<Bytes, ConnectionError>;
1039 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1040 let mut this = self.project();
1041 let mut state = this.conn.state.lock("ReadDatagram::poll");
1042 match state.inner.datagrams().recv() {
1045 Some(x) => {
1046 return Poll::Ready(Ok(x));
1047 }
1048 _ => {
1049 if let Some(ref e) = state.error {
1050 return Poll::Ready(Err(e.clone()));
1051 }
1052 }
1053 }
1054 loop {
1055 match this.notify.as_mut().poll(ctx) {
1056 Poll::Pending => return Poll::Pending,
1058 Poll::Ready(()) => this
1060 .notify
1061 .set(this.conn.shared.datagram_received.notified()),
1062 }
1063 }
1064 }
1065}
1066
1067pin_project! {
1068 pub struct DatagramDrop<'a> {
1070 conn: &'a ConnectionRef,
1071 #[pin]
1072 notify: Notified<'a>,
1073 }
1074}
1075
1076impl Future for DatagramDrop<'_> {
1077 type Output = Result<DatagramDropStats, ConnectionError>;
1078
1079 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1080 let mut this = self.project();
1081 let mut state = this.conn.state.lock("DatagramDrop::poll");
1082 if let Some(drop) = state.datagram_drop_events.pop_front() {
1083 return Poll::Ready(Ok(drop));
1084 }
1085 if let Some(ref e) = state.error {
1086 return Poll::Ready(Err(e.clone()));
1087 }
1088 loop {
1089 match this.notify.as_mut().poll(ctx) {
1090 Poll::Pending => return Poll::Pending,
1092 Poll::Ready(()) => this
1093 .notify
1094 .set(this.conn.shared.datagram_dropped.notified()),
1095 }
1096 }
1097 }
1098}
1099
1100pin_project! {
1101 pub struct SendDatagram<'a> {
1103 conn: &'a ConnectionRef,
1104 data: Option<Bytes>,
1105 #[pin]
1106 notify: Notified<'a>,
1107 }
1108}
1109
1110impl Future for SendDatagram<'_> {
1111 type Output = Result<(), SendDatagramError>;
1112 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1113 let mut this = self.project();
1114 let mut state = this.conn.state.lock("SendDatagram::poll");
1115 if let Some(ref e) = state.error {
1116 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1117 }
1118 use crate::SendDatagramError::*;
1119 match state.inner.datagrams().send(
1120 this.data.take().ok_or_else(|| {
1121 error!("SendDatagram future polled without data");
1122 SendDatagramError::ConnectionLost(ConnectionError::LocallyClosed)
1123 })?,
1124 false,
1125 ) {
1126 Ok(()) => {
1127 state.wake();
1128 Poll::Ready(Ok(()))
1129 }
1130 Err(e) => Poll::Ready(Err(match e {
1131 Blocked(data) => {
1132 this.data.replace(data);
1133 loop {
1134 match this.notify.as_mut().poll(ctx) {
1135 Poll::Pending => return Poll::Pending,
1136 Poll::Ready(()) => this
1138 .notify
1139 .set(this.conn.shared.datagrams_unblocked.notified()),
1140 }
1141 }
1142 }
1143 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1144 Disabled => SendDatagramError::Disabled,
1145 TooLarge => SendDatagramError::TooLarge,
1146 })),
1147 }
1148 }
1149}
1150
1151#[derive(Debug)]
1152pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1153
1154impl ConnectionRef {
1155 #[allow(clippy::too_many_arguments)]
1156 fn new(
1157 handle: ConnectionHandle,
1158 conn: crate::Connection,
1159 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1160 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1161 on_handshake_data: oneshot::Sender<()>,
1162 on_connected: oneshot::Sender<bool>,
1163 socket: Arc<dyn AsyncUdpSocket>,
1164 runtime: Arc<dyn Runtime>,
1165 ) -> Self {
1166 Self(Arc::new(ConnectionInner {
1167 state: Mutex::new(State {
1168 inner: conn,
1169 driver: None,
1170 handle,
1171 on_handshake_data: Some(on_handshake_data),
1172 on_connected: Some(on_connected),
1173 connected: false,
1174 timer: None,
1175 timer_deadline: None,
1176 conn_events,
1177 endpoint_events,
1178 blocked_writers: FxHashMap::default(),
1179 blocked_readers: FxHashMap::default(),
1180 stopped: FxHashMap::default(),
1181 error: None,
1182 ref_count: 0,
1183 datagram_drop_events: VecDeque::new(),
1184 io_poller: socket.clone().create_io_poller(),
1185 socket,
1186 runtime,
1187 send_buffer: Vec::new(),
1188 buffered_transmit: None,
1189 binding_started: false,
1190 }),
1191 shared: Shared::default(),
1192 }))
1193 }
1194
1195 fn stable_id(&self) -> usize {
1196 &*self.0 as *const _ as usize
1197 }
1198}
1199
1200impl Clone for ConnectionRef {
1201 fn clone(&self) -> Self {
1202 self.state.lock("clone").ref_count += 1;
1203 Self(self.0.clone())
1204 }
1205}
1206
1207impl Drop for ConnectionRef {
1208 fn drop(&mut self) {
1209 let conn = &mut *self.state.lock("drop");
1210 if let Some(x) = conn.ref_count.checked_sub(1) {
1211 conn.ref_count = x;
1212 if x == 0 && !conn.inner.is_closed() {
1213 conn.implicit_close(&self.shared);
1218 }
1219 }
1220 }
1221}
1222
1223impl std::ops::Deref for ConnectionRef {
1224 type Target = ConnectionInner;
1225 fn deref(&self) -> &Self::Target {
1226 &self.0
1227 }
1228}
1229
1230#[derive(Debug)]
1231pub(crate) struct ConnectionInner {
1232 pub(crate) state: Mutex<State>,
1233 pub(crate) shared: Shared,
1234}
1235
1236#[derive(Debug, Default)]
1237pub(crate) struct Shared {
1238 stream_budget_available: [Notify; 2],
1241 stream_incoming: [Notify; 2],
1243 datagram_received: Notify,
1244 datagrams_unblocked: Notify,
1245 datagram_dropped: Notify,
1246 observed_address_updated: Notify,
1247 closed: Notify,
1248}
1249
1250pub(crate) struct State {
1251 pub(crate) inner: crate::Connection,
1252 driver: Option<Waker>,
1253 handle: ConnectionHandle,
1254 on_handshake_data: Option<oneshot::Sender<()>>,
1255 on_connected: Option<oneshot::Sender<bool>>,
1256 connected: bool,
1257 timer: Option<Pin<Box<dyn AsyncTimer>>>,
1258 timer_deadline: Option<Instant>,
1259 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1260 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1261 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1262 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1263 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1264 pub(crate) error: Option<ConnectionError>,
1266 ref_count: usize,
1268 datagram_drop_events: VecDeque<DatagramDropStats>,
1269 socket: Arc<dyn AsyncUdpSocket>,
1270 io_poller: Pin<Box<dyn UdpPoller>>,
1271 runtime: Arc<dyn Runtime>,
1272 send_buffer: Vec<u8>,
1273 buffered_transmit: Option<crate::Transmit>,
1275 binding_started: bool,
1277}
1278
1279impl State {
1280 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1281 let now = self.runtime.now();
1282 let mut transmits = 0;
1283
1284 let max_datagrams = self
1285 .socket
1286 .max_transmit_segments()
1287 .min(MAX_TRANSMIT_SEGMENTS);
1288
1289 loop {
1290 let t = match self.buffered_transmit.take() {
1292 Some(t) => t,
1293 None => {
1294 self.send_buffer.clear();
1295 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1296 match self
1297 .inner
1298 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1299 {
1300 Some(t) => {
1301 transmits += match t.segment_size {
1302 None => 1,
1303 Some(s) => t.size.div_ceil(s), };
1305 t
1306 }
1307 None => break,
1308 }
1309 }
1310 };
1311
1312 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1313 self.buffered_transmit = Some(t);
1315 return Ok(false);
1316 }
1317
1318 let len = t.size;
1319 let retry = match self
1320 .socket
1321 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1322 {
1323 Ok(()) => false,
1324 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1325 Err(e) => return Err(e),
1326 };
1327 if retry {
1328 self.buffered_transmit = Some(t);
1333 continue;
1334 }
1335
1336 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1337 return Ok(true);
1342 }
1343 }
1344
1345 Ok(false)
1346 }
1347
1348 fn forward_endpoint_events(&mut self) {
1349 while let Some(event) = self.inner.poll_endpoint_events() {
1350 let _ = self.endpoint_events.send((self.handle, event));
1352 }
1353 }
1354
1355 fn process_conn_events(
1357 &mut self,
1358 shared: &Shared,
1359 cx: &mut Context,
1360 ) -> Result<(), ConnectionError> {
1361 loop {
1362 match self.conn_events.poll_recv(cx) {
1363 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1364 self.socket = socket;
1365 self.io_poller = self.socket.clone().create_io_poller();
1366 self.inner.local_address_changed();
1367 }
1368 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1369 let mut observed_before = self.inner.all_observed_addresses();
1370 observed_before.sort_unstable();
1371 observed_before.dedup();
1372
1373 self.inner.handle_event(event);
1374
1375 let mut observed_after = self.inner.all_observed_addresses();
1376 observed_after.sort_unstable();
1377 observed_after.dedup();
1378 if observed_before != observed_after {
1379 shared.observed_address_updated.notify_one();
1380 }
1381 }
1382 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1383 self.close(error_code, reason, shared);
1384 }
1385 Poll::Ready(None) => {
1386 return Err(ConnectionError::TransportError(crate::TransportError {
1387 code: crate::TransportErrorCode::INTERNAL_ERROR,
1388 frame: None,
1389 reason: "endpoint driver future was dropped".to_string(),
1390 }));
1391 }
1392 Poll::Pending => {
1393 return Ok(());
1394 }
1395 }
1396 }
1397 }
1398
1399 fn forward_app_events(&mut self, shared: &Shared) {
1400 while let Some(event) = self.inner.poll() {
1401 use crate::Event::*;
1402 match event {
1403 HandshakeDataReady => {
1404 if let Some(x) = self.on_handshake_data.take() {
1405 let _ = x.send(());
1406 }
1407 }
1408 Connected => {
1409 self.connected = true;
1410 if let Some(x) = self.on_connected.take() {
1411 let _ = x.send(self.inner.accepted_0rtt());
1413 }
1414 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1415 wake_all(&mut self.blocked_writers);
1418 wake_all(&mut self.blocked_readers);
1419 wake_all_notify(&mut self.stopped);
1420 }
1421 }
1422 ConnectionLost { reason } => {
1423 self.terminate(reason, shared);
1424 }
1425 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1426 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1427 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1428 }
1429 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1430 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1431 }
1432 DatagramReceived => {
1433 shared.datagram_received.notify_waiters();
1434 }
1435 DatagramsUnblocked => {
1436 shared.datagrams_unblocked.notify_waiters();
1437 }
1438 DatagramDropped(drop) => {
1439 tracing::debug!(
1441 datagrams = drop.datagrams,
1442 bytes = drop.bytes,
1443 "datagrams dropped due to receive buffer overflow"
1444 );
1445 self.datagram_drop_events.push_back(drop);
1446 shared.datagram_dropped.notify_waiters();
1447 }
1448 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1449 Stream(StreamEvent::Available { dir }) => {
1450 shared.stream_budget_available[dir as usize].notify_waiters();
1452 }
1453 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1454 Stream(StreamEvent::Stopped { id, .. }) => {
1455 wake_stream_notify(id, &mut self.stopped);
1456 wake_stream(id, &mut self.blocked_writers);
1457 }
1458 }
1459 }
1460 }
1461
1462 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1463 match self.inner.poll_timeout() {
1467 Some(deadline) => {
1468 if let Some(delay) = &mut self.timer {
1469 if self
1472 .timer_deadline
1473 .map(|current_deadline| current_deadline != deadline)
1474 .unwrap_or(true)
1475 {
1476 delay.as_mut().reset(deadline);
1477 }
1478 } else {
1479 self.timer = Some(self.runtime.new_timer(deadline));
1480 }
1481 self.timer_deadline = Some(deadline);
1483 }
1484 None => {
1485 self.timer_deadline = None;
1486 return false;
1487 }
1488 }
1489
1490 if self.timer_deadline.is_none() {
1491 return false;
1492 }
1493
1494 let delay = match self.timer.as_mut() {
1495 Some(timer) => timer.as_mut(),
1496 None => {
1497 error!("Timer missing in state where it should exist");
1498 return false;
1499 }
1500 };
1501 if delay.poll(cx).is_pending() {
1502 return false;
1505 }
1506
1507 self.inner.handle_timeout(self.runtime.now());
1510 self.timer_deadline = None;
1511 true
1512 }
1513
1514 pub(crate) fn wake(&mut self) {
1516 if let Some(x) = self.driver.take() {
1517 x.wake();
1518 }
1519 }
1520
1521 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1523 self.error = Some(reason.clone());
1524 if let Some(x) = self.on_handshake_data.take() {
1525 let _ = x.send(());
1526 }
1527 wake_all(&mut self.blocked_writers);
1528 wake_all(&mut self.blocked_readers);
1529 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1530 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1531 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1532 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1533 shared.datagram_received.notify_waiters();
1534 shared.datagrams_unblocked.notify_waiters();
1535 shared.datagram_dropped.notify_waiters();
1536 shared.observed_address_updated.notify_waiters();
1537 if let Some(x) = self.on_connected.take() {
1538 let _ = x.send(false);
1539 }
1540 wake_all_notify(&mut self.stopped);
1541 shared.closed.notify_waiters();
1542 }
1543
1544 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1545 self.inner.close(self.runtime.now(), error_code, reason);
1546 self.terminate(ConnectionError::LocallyClosed, shared);
1547 self.wake();
1548 }
1549
1550 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1552 self.close(0u32.into(), Bytes::new(), shared);
1553 }
1554
1555 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1556 if self.inner.is_handshaking()
1557 || self.inner.accepted_0rtt()
1558 || self.inner.side().is_server()
1559 {
1560 Ok(())
1561 } else {
1562 Err(())
1563 }
1564 }
1565}
1566
1567impl Drop for State {
1568 fn drop(&mut self) {
1569 if !self.inner.is_drained() {
1570 let _ = self
1572 .endpoint_events
1573 .send((self.handle, crate::EndpointEvent::drained()));
1574 }
1575 }
1576}
1577
1578impl fmt::Debug for State {
1579 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1580 f.debug_struct("State").field("inner", &self.inner).finish()
1581 }
1582}
1583
1584fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1585 if let Some(waker) = wakers.remove(&stream_id) {
1586 waker.wake();
1587 }
1588}
1589
1590fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1591 wakers.drain().for_each(|(_, waker)| waker.wake())
1592}
1593
1594fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1595 if let Some(notify) = wakers.remove(&stream_id) {
1596 notify.notify_waiters()
1597 }
1598}
1599
1600fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1601 wakers
1602 .drain()
1603 .for_each(|(_, notify)| notify.notify_waiters())
1604}
1605
1606#[derive(Debug, Error, Clone, Eq, PartialEq)]
1608pub enum SendDatagramError {
1609 #[error("datagrams not supported by peer")]
1611 UnsupportedByPeer,
1612 #[error("datagram support disabled")]
1614 Disabled,
1615 #[error("datagram too large")]
1620 TooLarge,
1621 #[error("connection lost")]
1623 ConnectionLost(#[from] ConnectionError),
1624}
1625
1626const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1631
1632const MAX_TRANSMIT_SEGMENTS: usize = 10;