1#[cfg(not(wasm_browser))]
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
19use iroh_relay::{RelayConfig, RelayMap};
20#[cfg(not(wasm_browser))]
21use n0_error::bail;
22use n0_error::{e, ensure, stack_error};
23use n0_watcher::Watcher;
24#[cfg(not(wasm_browser))]
25use netdev::ipnet::{Ipv4Net, Ipv6Net};
26use tracing::{debug, instrument, trace, warn};
27use url::Url;
28
29use self::hooks::EndpointHooksList;
30pub use super::socket::{
31 BindError, DirectAddr, DirectAddrType, PathInfo,
32 remote_map::{PathInfoList, RemoteInfo, Source, TransportAddrInfo, TransportAddrUsage},
33};
34#[cfg(wasm_browser)]
35use crate::address_lookup::PkarrResolver;
36#[cfg(not(wasm_browser))]
37use crate::dns::DnsResolver;
38use crate::{
39 NetReport,
40 address_lookup::{
41 ConcurrentAddressLookup, DynIntoAddressLookup, Error as AddressLookupError,
42 IntoAddressLookup, UserData,
43 },
44 endpoint::presets::Preset,
45 metrics::EndpointMetrics,
46 socket::{self, Handle, RemoteStateActorStoppedError, mapped_addrs::MappedAddr},
47 tls::{self, DEFAULT_MAX_TLS_TICKETS},
48};
49
50#[cfg(not(wasm_browser))]
51mod bind;
52mod connection;
53pub(crate) mod hooks;
54pub mod presets;
55pub(crate) mod quic;
56
57#[cfg(not(wasm_browser))]
58pub use bind::{BindOpts, InvalidSocketAddr, ToSocketAddr};
59pub use hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks};
60
61#[cfg(feature = "qlog")]
62pub use self::quic::{QlogConfig, QlogFactory, QlogFileFactory};
63pub use self::{
64 connection::{
65 Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
66 ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
67 IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
68 RemoteEndpointIdError, RetryError, ZeroRttStatus,
69 },
70 quic::{
71 AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
72 ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
73 ControllerMetrics, CryptoError, Dir, ExportKeyingMaterialError, FrameStats, FrameType,
74 HandshakeTokenKey, HeaderKey, IdleTimeout, Keys, MtuDiscoveryConfig, OpenBi, OpenUni,
75 PacketKey, PathId, PathStats, QuicConnectError, QuicTransportConfig,
76 QuicTransportConfigBuilder, ReadDatagram, ReadError, ReadExactError, ReadToEndError,
77 RecvStream, ResetError, RttEstimator, SendDatagram, SendDatagramError, SendStream,
78 ServerConfig, ServerConfigBuilder, Side, StoppedError, StreamId, TimeSource, TokenLog,
79 TokenReuseError, TransportError, TransportErrorCode, TransportParameters, UdpStats,
80 UnorderedRecvStream, UnsupportedVersion, ValidationTokenConfig, VarInt,
81 VarIntBoundsExceeded, WriteError, Written,
82 },
83};
84#[cfg(not(wasm_browser))]
85use crate::socket::transports::IpConfig;
86use crate::socket::transports::TransportConfig;
87
88#[derive(Debug)]
95pub struct Builder {
96 secret_key: Option<SecretKey>,
97 alpn_protocols: Vec<Vec<u8>>,
98 transport_config: QuicTransportConfig,
99 keylog: bool,
100 address_lookup: Vec<Box<dyn DynIntoAddressLookup>>,
101 address_lookup_user_data: Option<UserData>,
102 proxy_url: Option<Url>,
103 #[cfg(not(wasm_browser))]
104 dns_resolver: Option<DnsResolver>,
105 #[cfg(any(test, feature = "test-utils"))]
106 insecure_skip_relay_cert_verify: bool,
107 transports: Vec<TransportConfig>,
108 max_tls_tickets: usize,
109 hooks: EndpointHooksList,
110}
111
112impl From<RelayMode> for Option<TransportConfig> {
113 fn from(mode: RelayMode) -> Self {
114 match mode {
115 RelayMode::Disabled => None,
116 RelayMode::Default => Some(TransportConfig::Relay {
117 relay_map: mode.relay_map(),
118 is_user_defined: true,
119 }),
120 RelayMode::Staging => Some(TransportConfig::Relay {
121 relay_map: mode.relay_map(),
122 is_user_defined: true,
123 }),
124 RelayMode::Custom(relay_map) => Some(TransportConfig::Relay {
125 relay_map,
126 is_user_defined: true,
127 }),
128 }
129 }
130}
131
132impl Builder {
133 pub fn new<P: Preset>(preset: P) -> Self {
140 Self::empty(RelayMode::Disabled).preset(preset)
141 }
142
143 pub fn preset<P: Preset>(mut self, preset: P) -> Self {
145 self = preset.apply(self);
146 self
147 }
148
149 pub fn empty(relay_mode: RelayMode) -> Self {
151 let mut transports = vec![
152 #[cfg(not(wasm_browser))]
153 TransportConfig::default_ipv4(),
154 #[cfg(not(wasm_browser))]
155 TransportConfig::default_ipv6(),
156 ];
157 if let Some(relay) = relay_mode.into() {
158 transports.push(relay);
159 }
160 Self {
161 secret_key: Default::default(),
162 alpn_protocols: Default::default(),
163 transport_config: QuicTransportConfig::default(),
164 keylog: Default::default(),
165 address_lookup: Default::default(),
166 address_lookup_user_data: Default::default(),
167 proxy_url: None,
168 #[cfg(not(wasm_browser))]
169 dns_resolver: None,
170 #[cfg(any(test, feature = "test-utils"))]
171 insecure_skip_relay_cert_verify: false,
172 max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
173 transports,
174 hooks: Default::default(),
175 }
176 }
177
178 pub async fn bind(self) -> Result<Endpoint, BindError> {
182 let mut rng = rand::rng();
183 let secret_key = self
184 .secret_key
185 .unwrap_or_else(move || SecretKey::generate(&mut rng));
186
187 let static_config = StaticConfig {
188 transport_config: self.transport_config.clone(),
189 tls_config: tls::TlsConfig::new(secret_key.clone(), self.max_tls_tickets),
190 keylog: self.keylog,
191 };
192 let server_config = static_config.create_server_config(self.alpn_protocols);
193
194 #[cfg(not(wasm_browser))]
195 let dns_resolver = self.dns_resolver.unwrap_or_default();
196
197 let metrics = EndpointMetrics::default();
198
199 let sock_opts = socket::Options {
200 transports: self.transports,
201 secret_key,
202 address_lookup_user_data: self.address_lookup_user_data,
203 proxy_url: self.proxy_url,
204 #[cfg(not(wasm_browser))]
205 dns_resolver,
206 server_config,
207 #[cfg(any(test, feature = "test-utils"))]
208 insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
209 metrics,
210 hooks: self.hooks,
211 };
212
213 let sock = socket::Socket::spawn(sock_opts).await?;
214 trace!("created socket");
215 debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
216
217 let ep = Endpoint {
218 sock,
219 static_config: Arc::new(static_config),
220 };
221
222 for create_service in self.address_lookup {
224 let service = create_service.into_address_lookup(&ep)?;
225 ep.address_lookup().add_boxed(service);
226 }
227
228 Ok(ep)
229 }
230
231 #[cfg(not(wasm_browser))]
281 pub fn bind_addr<A>(self, addr: A) -> Result<Self, InvalidSocketAddr>
282 where
283 A: ToSocketAddr,
284 <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
285 {
286 self.bind_addr_with_opts(addr, BindOpts::default())
287 }
288
289 #[cfg(not(wasm_browser))]
358 pub fn bind_addr_with_opts<A>(
359 mut self,
360 addr: A,
361 opts: BindOpts,
362 ) -> Result<Self, InvalidSocketAddr>
363 where
364 A: ToSocketAddr,
365 <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
366 {
367 let addr = addr.to_socket_addr().map_err(Into::into)?;
368 match addr {
369 SocketAddr::V4(addr) => {
370 if self
371 .transports
372 .iter()
373 .any(|t| t.is_ipv4_default() && t.is_user_defined())
374 {
375 bail!(InvalidSocketAddr::DuplicateDefaultAddr);
376 }
377
378 let ip_net = Ipv4Net::new(*addr.ip(), opts.prefix_len())?;
379 self.transports.push(TransportConfig::Ip {
380 config: IpConfig::V4 {
381 ip_net,
382 port: addr.port(),
383 is_required: opts.is_required(),
384 is_default: opts.is_default_route(),
385 },
386 is_user_defined: true,
387 });
388 }
389 SocketAddr::V6(addr) => {
390 if self
391 .transports
392 .iter()
393 .any(|t| t.is_ipv6_default() && t.is_user_defined())
394 {
395 bail!(InvalidSocketAddr::DuplicateDefaultAddr);
396 }
397
398 let ip_net = Ipv6Net::new(*addr.ip(), opts.prefix_len())?;
399 self.transports.push(TransportConfig::Ip {
400 config: IpConfig::V6 {
401 ip_net,
402 scope_id: addr.scope_id(),
403 port: addr.port(),
404 is_required: opts.is_required(),
405 is_default: opts.is_default_route(),
406 },
407 is_user_defined: true,
408 });
409 }
410 }
411 Ok(self)
412 }
413
414 #[cfg(not(wasm_browser))]
416 pub fn clear_ip_transports(mut self) -> Self {
417 self.transports
418 .retain(|t| !matches!(t, TransportConfig::Ip { .. }));
419 self
420 }
421
422 pub fn clear_relay_transports(mut self) -> Self {
424 self.transports
425 .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
426 self
427 }
428
429 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
438 self.secret_key = Some(secret_key);
439 self
440 }
441
442 pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
449 self.alpn_protocols = alpn_protocols;
450 self
451 }
452
453 pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
471 let transport: Option<_> = relay_mode.into();
472 match transport {
473 Some(transport) => {
474 if let Some(og) = self
475 .transports
476 .iter_mut()
477 .find(|t| matches!(t, TransportConfig::Relay { .. }))
478 {
479 *og = transport;
480 } else {
481 self.transports.push(transport);
482 }
483 }
484 None => {
485 self.transports
486 .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
487 }
488 }
489 self
490 }
491
492 pub fn clear_address_lookup(mut self) -> Self {
499 self.address_lookup.clear();
500 self
501 }
502
503 pub fn address_lookup(mut self, address_lookup: impl IntoAddressLookup) -> Self {
519 self.address_lookup.push(Box::new(address_lookup));
520 self
521 }
522
523 pub fn user_data_for_address_lookup(mut self, user_data: UserData) -> Self {
532 self.address_lookup_user_data = Some(user_data);
533 self
534 }
535
536 pub fn transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
550 self.transport_config = transport_config;
551 self
552 }
553
554 #[cfg(not(wasm_browser))]
564 pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
565 self.dns_resolver = Some(dns_resolver);
566 self
567 }
568
569 pub fn proxy_url(mut self, url: Url) -> Self {
571 self.proxy_url.replace(url);
572 self
573 }
574
575 pub fn proxy_from_env(mut self) -> Self {
582 self.proxy_url = proxy_url_from_env();
583 self
584 }
585
586 pub fn keylog(mut self, keylog: bool) -> Self {
594 self.keylog = keylog;
595 self
596 }
597
598 #[cfg(any(test, feature = "test-utils"))]
602 pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
603 self.insecure_skip_relay_cert_verify = skip_verify;
604 self
605 }
606
607 pub fn max_tls_tickets(mut self, n: usize) -> Self {
614 self.max_tls_tickets = n;
615 self
616 }
617
618 pub fn hooks(mut self, hooks: impl EndpointHooks + 'static) -> Self {
629 self.hooks.push(hooks);
630 self
631 }
632}
633
634#[derive(Debug)]
636struct StaticConfig {
637 tls_config: tls::TlsConfig,
638 transport_config: QuicTransportConfig,
639 keylog: bool,
640}
641
642impl StaticConfig {
643 fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> quinn_proto::ServerConfig {
645 let quic_server_config = self
646 .tls_config
647 .make_server_config(alpn_protocols, self.keylog);
648 let mut inner = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
649 inner.transport_config(self.transport_config.to_inner_arc());
650 inner
651 }
652}
653
654#[derive(Clone, Debug)]
680pub struct Endpoint {
681 pub(crate) sock: Handle,
683 static_config: Arc<StaticConfig>,
685}
686
687#[allow(missing_docs)]
688#[stack_error(derive, add_meta, from_sources)]
689#[non_exhaustive]
690#[allow(private_interfaces)]
691pub enum ConnectWithOptsError {
692 #[error("Connecting to ourself is not supported")]
693 SelfConnect,
694 #[error("No addressing information available")]
695 NoAddress { source: AddressLookupError },
696 #[error("Unable to connect to remote")]
697 Quinn {
698 #[error(std_err)]
699 source: QuicConnectError,
700 },
701 #[error("Internal consistency error")]
702 InternalConsistencyError {
703 source: RemoteStateActorStoppedError,
705 },
706 #[error("Connection was rejected locally")]
707 LocallyRejected,
708}
709
710#[allow(missing_docs)]
711#[stack_error(derive, add_meta, from_sources)]
712#[non_exhaustive]
713pub enum ConnectError {
714 #[error(transparent)]
715 Connect { source: ConnectWithOptsError },
716 #[error(transparent)]
717 Connecting { source: ConnectingError },
718 #[error(transparent)]
719 Connection {
720 #[error(std_err)]
721 source: ConnectionError,
722 },
723}
724
725impl Endpoint {
726 pub fn builder() -> Builder {
736 Builder::new(presets::N0)
737 }
738
739 pub fn empty_builder(relay_mode: RelayMode) -> Builder {
743 Builder::empty(relay_mode)
744 }
745
746 pub async fn bind() -> Result<Self, BindError> {
750 Self::builder().bind().await
751 }
752
753 pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) {
758 let server_config = self.static_config.create_server_config(alpns);
759 self.sock.endpoint().set_server_config(Some(server_config));
760 }
761
762 pub async fn insert_relay(
766 &self,
767 relay: RelayUrl,
768 config: Arc<RelayConfig>,
769 ) -> Option<Arc<RelayConfig>> {
770 self.sock.insert_relay(relay, config).await
771 }
772
773 pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayConfig>> {
777 self.sock.remove_relay(relay).await
778 }
779
780 pub async fn connect(
806 &self,
807 endpoint_addr: impl Into<EndpointAddr>,
808 alpn: &[u8],
809 ) -> Result<Connection, ConnectError> {
810 let endpoint_addr = endpoint_addr.into();
811 let remote = endpoint_addr.id;
812 let connecting = self
813 .connect_with_opts(endpoint_addr, alpn, Default::default())
814 .await?;
815 let conn = connecting.await?;
816
817 debug!(
818 me = %self.id().fmt_short(),
819 remote = %remote.fmt_short(),
820 alpn = %String::from_utf8_lossy(alpn),
821 "Connection established."
822 );
823 Ok(conn)
824 }
825
826 #[instrument(name = "connect", skip_all, fields(
841 me = %self.id().fmt_short(),
842 remote = tracing::field::Empty,
843 alpn = String::from_utf8_lossy(alpn).to_string(),
844 ))]
845 pub async fn connect_with_opts(
846 &self,
847 endpoint_addr: impl Into<EndpointAddr>,
848 alpn: &[u8],
849 options: ConnectOptions,
850 ) -> Result<Connecting, ConnectWithOptsError> {
851 let endpoint_addr: EndpointAddr = endpoint_addr.into();
852 if let BeforeConnectOutcome::Reject =
853 self.sock.hooks.before_connect(&endpoint_addr, alpn).await
854 {
855 return Err(e!(ConnectWithOptsError::LocallyRejected));
856 }
857 let endpoint_id = endpoint_addr.id;
858
859 tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short()));
860
861 ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect);
863
864 trace!(
865 dst_endpoint_id = %endpoint_id.fmt_short(),
866 relay_url = ?endpoint_addr.relay_urls().next().cloned(),
867 ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::<Vec<_>>(),
868 "connecting",
869 );
870
871 let mapped_addr = self.sock.resolve_remote(endpoint_addr).await??;
872
873 let transport_config = options
874 .transport_config
875 .map(|cfg| cfg.to_inner_arc())
876 .unwrap_or(self.static_config.transport_config.to_inner_arc());
877
878 let client_config = {
882 let mut alpn_protocols = vec![alpn.to_vec()];
883 alpn_protocols.extend(options.additional_alpns);
884 let quic_client_config = self
885 .static_config
886 .tls_config
887 .make_client_config(alpn_protocols, self.static_config.keylog);
888 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
889 client_config.transport_config(transport_config.clone());
890 client_config
891 };
892
893 let dest_addr = mapped_addr.private_socket_addr();
894 let server_name = &tls::name::encode(endpoint_id);
895 let connect = self
896 .sock
897 .endpoint()
898 .connect_with(client_config, dest_addr, server_name)?;
899
900 Ok(Connecting::new(connect, self.clone(), endpoint_id))
901 }
902
903 pub fn accept(&self) -> Accept<'_> {
912 Accept {
913 inner: self.sock.endpoint().accept(),
914 ep: self.clone(),
915 }
916 }
917
918 pub fn secret_key(&self) -> &SecretKey {
922 &self.static_config.tls_config.secret_key
923 }
924
925 pub fn id(&self) -> EndpointId {
930 self.static_config.tls_config.secret_key.public()
931 }
932
933 pub fn addr(&self) -> EndpointAddr {
946 self.watch_addr().get()
947 }
948
949 #[cfg(not(wasm_browser))]
984 pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
985 let watch_addrs = self.sock.ip_addrs();
986 let watch_relay = self.sock.home_relay();
987 let endpoint_id = self.id();
988
989 watch_addrs.or(watch_relay).map(move |(addrs, relays)| {
990 EndpointAddr::from_parts(
991 endpoint_id,
992 relays
993 .into_iter()
994 .map(TransportAddr::Relay)
995 .chain(addrs.into_iter().map(|x| TransportAddr::Ip(x.addr))),
996 )
997 })
998 }
999
1000 #[cfg(wasm_browser)]
1006 pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
1007 let watch_relay = self.sock.home_relay();
1011 let endpoint_id = self.id();
1012 watch_relay.map(move |mut relays| {
1013 EndpointAddr::from_parts(endpoint_id, relays.into_iter().map(TransportAddr::Relay))
1014 })
1015 }
1016
1017 pub async fn online(&self) {
1063 self.sock.home_relay().initialized().await;
1064 }
1065
1066 #[doc(hidden)]
1095 pub fn net_report(&self) -> impl Watcher<Value = Option<NetReport>> + use<> {
1096 self.sock.net_report()
1097 }
1098
1099 #[cfg(not(wasm_browser))]
1104 pub fn bound_sockets(&self) -> Vec<SocketAddr> {
1105 self.sock
1106 .local_addr()
1107 .into_iter()
1108 .filter_map(|addr| addr.into_socket_addr())
1109 .collect()
1110 }
1111
1112 #[cfg(not(wasm_browser))]
1120 pub fn dns_resolver(&self) -> &DnsResolver {
1121 self.sock.dns_resolver()
1122 }
1123
1124 pub fn address_lookup(&self) -> &ConcurrentAddressLookup {
1128 self.sock.address_lookup()
1129 }
1130
1131 #[cfg(feature = "metrics")]
1246 pub fn metrics(&self) -> &EndpointMetrics {
1247 &self.sock.metrics
1248 }
1249
1250 pub async fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
1259 self.sock.remote_info(endpoint_id).await
1260 }
1261
1262 pub async fn network_change(&self) {
1275 self.sock.network_change().await;
1276 }
1277
1278 pub fn set_user_data_for_address_lookup(&self, user_data: Option<UserData>) {
1288 self.sock.set_user_data_for_address_lookup(user_data);
1289 }
1290
1291 pub async fn close(&self) {
1326 self.sock.close().await;
1327 }
1328
1329 pub fn is_closed(&self) -> bool {
1331 self.sock.is_closed()
1332 }
1333
1334 pub fn create_server_config_builder(&self, alpns: Vec<Vec<u8>>) -> ServerConfigBuilder {
1339 let inner = self.static_config.create_server_config(alpns);
1340 ServerConfigBuilder::new(inner, self.static_config.transport_config.clone())
1341 }
1342
1343 #[cfg(test)]
1346 pub(crate) fn socket(&self) -> Handle {
1347 self.sock.clone()
1348 }
1349 #[cfg(test)]
1350 pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1351 self.sock.endpoint()
1352 }
1353}
1354
1355#[derive(Default, Debug, Clone)]
1357pub struct ConnectOptions {
1358 transport_config: Option<QuicTransportConfig>,
1359 additional_alpns: Vec<Vec<u8>>,
1360}
1361
1362impl ConnectOptions {
1363 pub fn new() -> Self {
1368 Self::default()
1369 }
1370
1371 pub fn with_transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
1373 self.transport_config = Some(transport_config);
1374 self
1375 }
1376
1377 pub fn with_additional_alpns(mut self, alpns: Vec<Vec<u8>>) -> Self {
1398 self.additional_alpns = alpns;
1399 self
1400 }
1401}
1402
1403fn proxy_url_from_env() -> Option<Url> {
1410 if let Some(url) = std::env::var("HTTP_PROXY")
1411 .ok()
1412 .and_then(|s| s.parse::<Url>().ok())
1413 {
1414 if is_cgi() {
1415 warn!("HTTP_PROXY environment variable ignored in CGI");
1416 } else {
1417 return Some(url);
1418 }
1419 }
1420 if let Some(url) = std::env::var("http_proxy")
1421 .ok()
1422 .and_then(|s| s.parse::<Url>().ok())
1423 {
1424 return Some(url);
1425 }
1426 if let Some(url) = std::env::var("HTTPS_PROXY")
1427 .ok()
1428 .and_then(|s| s.parse::<Url>().ok())
1429 {
1430 return Some(url);
1431 }
1432 if let Some(url) = std::env::var("https_proxy")
1433 .ok()
1434 .and_then(|s| s.parse::<Url>().ok())
1435 {
1436 return Some(url);
1437 }
1438
1439 None
1440}
1441
1442#[derive(Debug, Clone, PartialEq, Eq)]
1444pub enum RelayMode {
1445 Disabled,
1448 Default,
1452 Staging,
1454 Custom(RelayMap),
1456}
1457
1458impl RelayMode {
1459 pub fn relay_map(&self) -> RelayMap {
1461 match self {
1462 RelayMode::Disabled => RelayMap::empty(),
1463 RelayMode::Default => crate::defaults::prod::default_relay_map(),
1464 RelayMode::Staging => crate::defaults::staging::default_relay_map(),
1465 RelayMode::Custom(relay_map) => relay_map.clone(),
1466 }
1467 }
1468
1469 pub fn custom(map: impl IntoIterator<Item = RelayUrl>) -> Self {
1483 let m = RelayMap::from_iter(map);
1484 Self::Custom(m)
1485 }
1486}
1487
1488pub const ENV_FORCE_STAGING_RELAYS: &str = "IROH_FORCE_STAGING_RELAYS";
1490
1491pub fn force_staging_infra() -> bool {
1493 matches!(std::env::var(ENV_FORCE_STAGING_RELAYS), Ok(value) if !value.is_empty())
1494}
1495
1496pub fn default_relay_mode() -> RelayMode {
1501 match force_staging_infra() {
1503 true => RelayMode::Staging,
1504 false => RelayMode::Default,
1505 }
1506}
1507
1508fn is_cgi() -> bool {
1513 std::env::var_os("REQUEST_METHOD").is_some()
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518 use std::{
1519 collections::BTreeMap,
1520 io,
1521 net::{IpAddr, Ipv4Addr, Ipv6Addr},
1522 str::FromStr,
1523 sync::Arc,
1524 time::{Duration, Instant},
1525 };
1526
1527 use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
1528 use n0_error::{AnyError as Error, Result, StdResultExt};
1529 use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle, time};
1530 use n0_tracing_test::traced_test;
1531 use n0_watcher::Watcher;
1532 use rand::SeedableRng;
1533 use tokio::sync::oneshot;
1534 use tracing::{Instrument, debug_span, info, info_span, instrument};
1535
1536 use super::Endpoint;
1537 use crate::{
1538 RelayMap, RelayMode,
1539 address_lookup::memory::MemoryLookup,
1540 endpoint::{
1541 ApplicationClose, BindError, BindOpts, ConnectOptions, Connection, ConnectionError,
1542 PathInfo,
1543 },
1544 protocol::{AcceptError, ProtocolHandler, Router},
1545 test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
1546 };
1547
1548 const TEST_ALPN: &[u8] = b"n0/iroh/test";
1549
1550 #[tokio::test]
1551 #[traced_test]
1552 async fn test_connect_self() -> Result {
1553 let ep = Endpoint::empty_builder(RelayMode::Disabled)
1554 .alpns(vec![TEST_ALPN.to_vec()])
1555 .bind()
1556 .await
1557 .unwrap();
1558 let my_addr = ep.addr();
1559 let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1560 assert!(res.is_err());
1561 let err = res.err().unwrap();
1562 assert!(err.to_string().starts_with("Connecting to ourself"));
1563
1564 Ok(())
1565 }
1566
1567 #[tokio::test]
1568 #[traced_test]
1569 async fn endpoint_connect_close() -> Result {
1570 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1571 let (relay_map, relay_url, _guard) = run_relay_server().await?;
1572 let server_secret_key = SecretKey::generate(&mut rng);
1573 let server_peer_id = server_secret_key.public();
1574
1575 let qlog = QlogFileGroup::from_env("endpoint_connect_close");
1576
1577 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1579 .secret_key(server_secret_key)
1580 .transport_config(qlog.create("server")?)
1581 .alpns(vec![TEST_ALPN.to_vec()])
1582 .insecure_skip_relay_cert_verify(true)
1583 .bind()
1584 .await?;
1585 ep.online().await;
1587
1588 let server = tokio::spawn(
1589 async move {
1590 info!("accepting connection");
1591 let incoming = ep.accept().await.anyerr()?;
1592 let conn = incoming.await.anyerr()?;
1593 let mut stream = conn.accept_uni().await.anyerr()?;
1594 let mut buf = [0u8; 5];
1595 stream.read_exact(&mut buf).await.anyerr()?;
1596 info!("Accepted 1 stream, received {buf:?}. Closing now.");
1597 conn.close(7u8.into(), b"bye");
1599
1600 let res = conn.accept_uni().await;
1601 assert_eq!(res.unwrap_err(), ConnectionError::LocallyClosed);
1602
1603 let res = stream.read_to_end(10).await;
1604 assert_eq!(
1605 res.unwrap_err(),
1606 quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1607 ConnectionError::LocallyClosed
1608 ))
1609 );
1610 info!("server test completed");
1611 Ok::<_, Error>(())
1612 }
1613 .instrument(info_span!("test-server")),
1614 );
1615
1616 let client = tokio::spawn(
1617 async move {
1618 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1619 .alpns(vec![TEST_ALPN.to_vec()])
1620 .insecure_skip_relay_cert_verify(true)
1621 .transport_config(qlog.create("client")?)
1622 .bind()
1623 .await?;
1624 info!("client connecting");
1625 let endpoint_addr = EndpointAddr::new(server_peer_id).with_relay_url(relay_url);
1626 let conn = ep.connect(endpoint_addr, TEST_ALPN).await?;
1627 let mut stream = conn.open_uni().await.anyerr()?;
1628
1629 stream.write_all(b"hello").await.anyerr()?;
1634
1635 info!("waiting for closed");
1636 let err = conn.closed().await;
1638 let expected_err = ConnectionError::ApplicationClosed(ApplicationClose {
1639 error_code: 7u8.into(),
1640 reason: b"bye".to_vec().into(),
1641 });
1642 assert_eq!(err, expected_err);
1643
1644 info!("opening new - expect it to fail");
1645 let res = conn.open_uni().await;
1646 assert_eq!(res.unwrap_err(), expected_err);
1647 info!("client test completed");
1648 Ok::<_, Error>(())
1649 }
1650 .instrument(info_span!("test-client")),
1651 );
1652
1653 let (server, client) = tokio::time::timeout(
1654 Duration::from_secs(30),
1655 n0_future::future::zip(server, client),
1656 )
1657 .await
1658 .anyerr()?;
1659 server.anyerr()??;
1660 client.anyerr()??;
1661 Ok(())
1662 }
1663
1664 #[tokio::test]
1665 #[traced_test]
1666 async fn endpoint_relay_connect_loop() -> Result {
1667 let test_start = Instant::now();
1668 let n_clients = 5;
1669 let n_chunks_per_client = 2;
1670 let chunk_size = 100;
1671 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1672 let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1673 let server_secret_key = SecretKey::generate(&mut rng);
1674 let server_endpoint_id = server_secret_key.public();
1675
1676 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1678 .insecure_skip_relay_cert_verify(true)
1679 .secret_key(server_secret_key)
1680 .alpns(vec![TEST_ALPN.to_vec()])
1681 .bind()
1682 .await?;
1683 ep.online().await;
1685
1686 info!(time = ?test_start.elapsed(), "test setup done");
1687
1688 let server = tokio::spawn(
1690 async move {
1691 let eps = ep.bound_sockets();
1692
1693 info!(me = %ep.id().fmt_short(), eps = ?eps, "server listening on");
1694 for i in 0..n_clients {
1695 tokio::time::timeout(Duration::from_secs(4), async {
1696 let round_start = Instant::now();
1697 info!("[server] round {i}");
1698 let incoming = ep.accept().await.anyerr()?;
1699 let conn = incoming.await.anyerr()?;
1700 let endpoint_id = conn.remote_id();
1701 info!(%i, peer = %endpoint_id.fmt_short(), "accepted connection");
1702 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1703 let mut buf = vec![0u8; chunk_size];
1704 for _i in 0..n_chunks_per_client {
1705 recv.read_exact(&mut buf).await.anyerr()?;
1706 send.write_all(&buf).await.anyerr()?;
1707 }
1708 info!(%i, peer = %endpoint_id.fmt_short(), "finishing");
1709 send.finish().anyerr()?;
1710 conn.closed().await; info!(%i, peer = %endpoint_id.fmt_short(), "finished");
1712 info!("[server] round {i} done in {:?}", round_start.elapsed());
1713 Ok::<_, Error>(())
1714 })
1715 .await
1716 .std_context("timeout")??;
1717 }
1718 Ok::<_, Error>(())
1719 }
1720 .instrument(debug_span!("server")),
1721 );
1722
1723 let client = tokio::spawn(async move {
1724 for i in 0..n_clients {
1725 let round_start = Instant::now();
1726 info!("[client] round {i}");
1727 let client_secret_key = SecretKey::generate(&mut rng);
1728 tokio::time::timeout(
1729 Duration::from_secs(4),
1730 async {
1731 info!("client binding");
1732 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1733 .alpns(vec![TEST_ALPN.to_vec()])
1734 .insecure_skip_relay_cert_verify(true)
1735 .secret_key(client_secret_key)
1736 .bind()
1737 .await?;
1738 let eps = ep.bound_sockets();
1739
1740 info!(me = %ep.id().fmt_short(), eps=?eps, "client bound");
1741 let endpoint_addr =
1742 EndpointAddr::new(server_endpoint_id).with_relay_url(relay_url.clone());
1743 info!(to = ?endpoint_addr, "client connecting");
1744 let conn = ep.connect(endpoint_addr, TEST_ALPN).await.anyerr()?;
1745 info!("client connected");
1746 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1747
1748 for i in 0..n_chunks_per_client {
1749 let mut buf = vec![i; chunk_size];
1750 send.write_all(&buf).await.anyerr()?;
1751 recv.read_exact(&mut buf).await.anyerr()?;
1752 assert_eq!(buf, vec![i; chunk_size]);
1753 }
1754 conn.close(0u32.into(), b"bye!");
1756 info!("client finished");
1757 ep.close().await;
1758 info!("client closed");
1759
1760 Ok::<_, Error>(())
1761 }
1762 .instrument(debug_span!("client", %i)),
1763 )
1764 .await
1765 .std_context("timeout")??;
1766 info!("[client] round {i} done in {:?}", round_start.elapsed());
1767 }
1768 Ok::<_, Error>(())
1769 });
1770
1771 server.await.anyerr()??;
1772 client.await.anyerr()??;
1773 Ok(())
1774 }
1775
1776 #[tokio::test]
1777 #[traced_test]
1778 async fn endpoint_send_relay() -> Result {
1779 let (relay_map, _relay_url, _guard) = run_relay_server().await?;
1780 let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1781 .insecure_skip_relay_cert_verify(true)
1782 .bind()
1783 .await?;
1784 let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1785 .insecure_skip_relay_cert_verify(true)
1786 .alpns(vec![TEST_ALPN.to_vec()])
1787 .bind()
1788 .await?;
1789
1790 let task = tokio::spawn({
1791 let server = server.clone();
1792 async move {
1793 let Some(conn) = server.accept().await else {
1794 n0_error::bail_any!("Expected an incoming connection");
1795 };
1796 let conn = conn.await.anyerr()?;
1797 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1798 let data = recv.read_to_end(1000).await.anyerr()?;
1799 send.write_all(&data).await.anyerr()?;
1800 send.finish().anyerr()?;
1801 conn.closed().await;
1802
1803 Ok::<_, Error>(())
1804 }
1805 });
1806
1807 let addr = server.addr();
1808 let conn = client.connect(addr, TEST_ALPN).await?;
1809 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1810 send.write_all(b"Hello, world!").await.anyerr()?;
1811 send.finish().anyerr()?;
1812 let data = recv.read_to_end(1000).await.anyerr()?;
1813 conn.close(0u32.into(), b"bye!");
1814
1815 task.await.anyerr()??;
1816
1817 client.close().await;
1818 server.close().await;
1819
1820 assert_eq!(&data, b"Hello, world!");
1821
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 #[traced_test]
1827 async fn endpoint_two_direct_only() -> Result {
1828 let ep1 = {
1831 let span = info_span!("server");
1832 let _guard = span.enter();
1833 Endpoint::builder()
1834 .alpns(vec![TEST_ALPN.to_vec()])
1835 .relay_mode(RelayMode::Disabled)
1836 .bind()
1837 .await?
1838 };
1839 let ep2 = {
1840 let span = info_span!("client");
1841 let _guard = span.enter();
1842 Endpoint::builder()
1843 .alpns(vec![TEST_ALPN.to_vec()])
1844 .relay_mode(RelayMode::Disabled)
1845 .bind()
1846 .await?
1847 };
1848 let ep1_nodeaddr = ep1.addr();
1849
1850 #[instrument(name = "client", skip_all)]
1851 async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<ConnectionError> {
1852 info!(me = %ep.id().fmt_short(), "client starting");
1853 let conn = ep.connect(dst, TEST_ALPN).await?;
1854 let mut send = conn.open_uni().await.anyerr()?;
1855 send.write_all(b"hello").await.anyerr()?;
1856 send.finish().anyerr()?;
1857 Ok(conn.closed().await)
1858 }
1859
1860 #[instrument(name = "server", skip_all)]
1861 async fn accept(ep: Endpoint, src: EndpointId) -> Result {
1862 info!(me = %ep.id().fmt_short(), "server starting");
1863 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1864 let node_id = conn.remote_id();
1865 assert_eq!(node_id, src);
1866 let mut recv = conn.accept_uni().await.anyerr()?;
1867 let msg = recv.read_to_end(100).await.anyerr()?;
1868 assert_eq!(msg, b"hello");
1869 Ok(())
1871 }
1872
1873 let ep1_accept = tokio::spawn(accept(ep1.clone(), ep2.id()));
1874 let ep2_connect = tokio::spawn(connect(ep2.clone(), ep1_nodeaddr));
1875
1876 ep1_accept.await.anyerr()??;
1877 let conn_closed = dbg!(ep2_connect.await.anyerr()??);
1878 assert!(matches!(
1879 conn_closed,
1880 ConnectionError::ApplicationClosed(ApplicationClose { .. })
1881 ));
1882
1883 Ok(())
1884 }
1885
1886 #[tokio::test]
1887 #[traced_test]
1888 async fn endpoint_two_relay_only_becomes_direct() -> Result {
1889 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1892 let (node_addr_tx, node_addr_rx) = oneshot::channel();
1893 let qlog = Arc::new(QlogFileGroup::from_env("two_relay_only_becomes_direct"));
1894
1895 #[instrument(name = "client", skip_all)]
1896 async fn connect(
1897 relay_map: RelayMap,
1898 node_addr_rx: oneshot::Receiver<EndpointAddr>,
1899 qlog: Arc<QlogFileGroup>,
1900 ) -> Result<ConnectionError> {
1901 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1902 let secret = SecretKey::generate(&mut rng);
1903 let ep = Endpoint::builder()
1904 .secret_key(secret)
1905 .alpns(vec![TEST_ALPN.to_vec()])
1906 .insecure_skip_relay_cert_verify(true)
1907 .relay_mode(RelayMode::Custom(relay_map))
1908 .transport_config(qlog.create("client")?)
1909 .bind()
1910 .await?;
1911 info!(me = %ep.id().fmt_short(), "client starting");
1912 let dst = node_addr_rx.await.anyerr()?;
1913
1914 info!(me = %ep.id().fmt_short(), "client connecting");
1915 let conn = ep.connect(dst, TEST_ALPN).await?;
1916 let mut send = conn.open_uni().await.anyerr()?;
1917 send.write_all(b"hello").await.anyerr()?;
1918 let mut paths = conn.paths().stream();
1919 info!("Waiting for direct connection");
1920 while let Some(infos) = paths.next().await {
1921 info!(?infos, "new PathInfos");
1922 if infos.iter().any(|info| info.is_ip()) {
1923 break;
1924 }
1925 }
1926 info!("Have direct connection");
1927 assert_eq!(ep.metrics().socket.num_conns_opened.get(), 1);
1929 assert_eq!(ep.metrics().socket.num_conns_direct.get(), 1);
1930
1931 send.write_all(b"close please").await.anyerr()?;
1932 send.finish().anyerr()?;
1933
1934 Ok(conn.closed().await)
1935 }
1936
1937 #[instrument(name = "server", skip_all)]
1938 async fn accept(
1939 relay_map: RelayMap,
1940 node_addr_tx: oneshot::Sender<EndpointAddr>,
1941 qlog: Arc<QlogFileGroup>,
1942 ) -> Result {
1943 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1944 let secret = SecretKey::generate(&mut rng);
1945 let ep = Endpoint::builder()
1946 .secret_key(secret)
1947 .alpns(vec![TEST_ALPN.to_vec()])
1948 .insecure_skip_relay_cert_verify(true)
1949 .transport_config(qlog.create("server")?)
1950 .relay_mode(RelayMode::Custom(relay_map))
1951 .bind()
1952 .await?;
1953 ep.online().await;
1954 let mut node_addr = ep.addr();
1955 node_addr.addrs.retain(|addr| addr.is_relay());
1956 node_addr_tx.send(node_addr).unwrap();
1957
1958 info!(me = %ep.id().fmt_short(), "server starting");
1959 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1960 let mut recv = conn.accept_uni().await.anyerr()?;
1963 let mut msg = [0u8; 5];
1964 recv.read_exact(&mut msg).await.anyerr()?;
1965 assert_eq!(&msg, b"hello");
1966 info!("received hello");
1967 let msg = recv.read_to_end(100).await.anyerr()?;
1968 assert_eq!(msg, b"close please");
1969 info!("received 'close please'");
1970 Ok(())
1972 }
1973
1974 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx, qlog.clone()));
1975 let client_task = tokio::spawn(connect(relay_map, node_addr_rx, qlog));
1976
1977 server_task.await.anyerr()??;
1978 let conn_closed = dbg!(client_task.await.anyerr()??);
1979 assert!(matches!(
1980 conn_closed,
1981 ConnectionError::ApplicationClosed(ApplicationClose { .. })
1982 ));
1983
1984 Ok(())
1985 }
1986
1987 #[tokio::test]
1988 #[traced_test]
1989 async fn endpoint_two_relay_only_no_ip() -> Result {
1990 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1993 let (node_addr_tx, node_addr_rx) = oneshot::channel();
1994
1995 #[instrument(name = "client", skip_all)]
1996 async fn connect(
1997 relay_map: RelayMap,
1998 node_addr_rx: oneshot::Receiver<EndpointAddr>,
1999 ) -> Result<ConnectionError> {
2000 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
2001 let secret = SecretKey::generate(&mut rng);
2002 let ep = Endpoint::builder()
2003 .secret_key(secret)
2004 .alpns(vec![TEST_ALPN.to_vec()])
2005 .insecure_skip_relay_cert_verify(true)
2006 .relay_mode(RelayMode::Custom(relay_map))
2007 .clear_ip_transports() .bind()
2009 .await?;
2010 info!(me = %ep.id().fmt_short(), "client starting");
2011 let dst = node_addr_rx.await.anyerr()?;
2012
2013 info!(me = %ep.id().fmt_short(), "client connecting");
2014 let conn = ep.connect(dst, TEST_ALPN).await?;
2015 let mut send = conn.open_uni().await.anyerr()?;
2016 send.write_all(b"hello").await.anyerr()?;
2017 let mut paths = conn.paths().stream();
2018 info!("Waiting for connection");
2019 'outer: while let Some(infos) = paths.next().await {
2020 info!(?infos, "new PathInfos");
2021 for info in infos {
2022 if info.is_ip() {
2023 panic!("should not happen: {:?}", info);
2024 }
2025 if info.is_relay() {
2026 break 'outer;
2027 }
2028 }
2029 }
2030 info!("Have relay connection");
2031 send.write_all(b"close please").await.anyerr()?;
2032 send.finish().anyerr()?;
2033 Ok(conn.closed().await)
2034 }
2035
2036 #[instrument(name = "server", skip_all)]
2037 async fn accept(
2038 relay_map: RelayMap,
2039 node_addr_tx: oneshot::Sender<EndpointAddr>,
2040 ) -> Result {
2041 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
2042 let secret = SecretKey::generate(&mut rng);
2043 let ep = Endpoint::builder()
2044 .secret_key(secret)
2045 .alpns(vec![TEST_ALPN.to_vec()])
2046 .insecure_skip_relay_cert_verify(true)
2047 .relay_mode(RelayMode::Custom(relay_map))
2048 .clear_ip_transports()
2049 .bind()
2050 .await?;
2051 ep.online().await;
2052 let node_addr = ep.addr();
2053 node_addr_tx.send(node_addr).unwrap();
2054
2055 info!(me = %ep.id().fmt_short(), "server starting");
2056 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
2057 let mut recv = conn.accept_uni().await.anyerr()?;
2060 let mut msg = [0u8; 5];
2061 recv.read_exact(&mut msg).await.anyerr()?;
2062 assert_eq!(&msg, b"hello");
2063 info!("received hello");
2064 let msg = recv.read_to_end(100).await.anyerr()?;
2065 assert_eq!(msg, b"close please");
2066 info!("received 'close please'");
2067 Ok(())
2069 }
2070
2071 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
2072 let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
2073
2074 server_task.await.anyerr()??;
2075 let conn_closed = dbg!(client_task.await.anyerr()??);
2076 assert!(matches!(
2077 conn_closed,
2078 ConnectionError::ApplicationClosed(ApplicationClose { .. })
2079 ));
2080
2081 Ok(())
2082 }
2083
2084 #[tokio::test]
2085 #[traced_test]
2086 async fn endpoint_two_direct_add_relay() -> Result {
2087 let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
2090 let (node_addr_tx, node_addr_rx) = oneshot::channel();
2091
2092 #[instrument(name = "client", skip_all)]
2093 async fn connect(
2094 relay_map: RelayMap,
2095 node_addr_rx: oneshot::Receiver<EndpointAddr>,
2096 ) -> Result<()> {
2097 let secret = SecretKey::from([0u8; 32]);
2098 let ep = Endpoint::builder()
2099 .secret_key(secret)
2100 .alpns(vec![TEST_ALPN.to_vec()])
2101 .insecure_skip_relay_cert_verify(true)
2102 .relay_mode(RelayMode::Custom(relay_map))
2103 .bind()
2104 .await?;
2105 info!(me = %ep.id().fmt_short(), "client starting");
2106 let dst = node_addr_rx.await.anyerr()?;
2107
2108 info!(me = %ep.id().fmt_short(), "client connecting");
2109 let conn = ep.connect(dst, TEST_ALPN).await?;
2110 info!(me = %ep.id().fmt_short(), "client connected");
2111
2112 let path_info = conn.paths().get();
2115 assert_eq!(path_info.len(), 1);
2116 assert!(path_info.iter().next().unwrap().is_ip());
2117
2118 let mut paths = conn.paths().stream();
2119 time::timeout(Duration::from_secs(5), async move {
2120 while let Some(infos) = paths.next().await {
2121 info!(?infos, "new PathInfos");
2122 if infos.iter().any(|info| info.is_relay()) {
2123 info!("client has a relay path");
2124 break;
2125 }
2126 }
2127 })
2128 .await
2129 .anyerr()?;
2130
2131 let mut stream = conn.accept_uni().await.anyerr()?;
2133 stream.read_to_end(100).await.anyerr()?;
2134
2135 info!("client closing");
2136 conn.close(0u8.into(), b"");
2137 ep.close().await;
2138 Ok(())
2139 }
2140
2141 #[instrument(name = "server", skip_all)]
2142 async fn accept(
2143 relay_map: RelayMap,
2144 node_addr_tx: oneshot::Sender<EndpointAddr>,
2145 ) -> Result<ConnectionError> {
2146 let secret = SecretKey::from([1u8; 32]);
2147 let ep = Endpoint::builder()
2148 .secret_key(secret)
2149 .alpns(vec![TEST_ALPN.to_vec()])
2150 .insecure_skip_relay_cert_verify(true)
2151 .relay_mode(RelayMode::Custom(relay_map))
2152 .bind()
2153 .await?;
2154 ep.online().await;
2155 let node_addr = ep.addr();
2156 node_addr_tx.send(node_addr).unwrap();
2157
2158 info!(me = %ep.id().fmt_short(), "server starting");
2159 let conn = ep.accept().await.anyerr()?.await.anyerr()?;
2160 info!(me = %ep.id().fmt_short(), "server accepted connection");
2161
2162 let mut paths = conn.paths().stream();
2166 time::timeout(Duration::from_secs(5), async move {
2167 while let Some(infos) = paths.next().await {
2168 info!(?infos, "new PathInfos");
2169 if infos.iter().any(|path| path.is_relay()) {
2170 info!("server has a relay path");
2171 break;
2172 }
2173 }
2174 })
2175 .await
2176 .anyerr()?;
2177
2178 let mut stream = conn.open_uni().await.anyerr()?;
2179 stream.write_all(b"have relay").await.anyerr()?;
2180 stream.finish().anyerr()?;
2181 info!("waiting conn.closed()");
2182
2183 Ok(conn.closed().await)
2184 }
2185
2186 let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
2187 let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
2188
2189 client_task.await.anyerr()??;
2190 let conn_closed = dbg!(server_task.await.anyerr()??);
2191 assert!(matches!(
2192 conn_closed,
2193 ConnectionError::ApplicationClosed(ApplicationClose { .. })
2194 ));
2195
2196 Ok(())
2197 }
2198
2199 #[tokio::test]
2200 #[traced_test]
2201 async fn endpoint_relay_map_change() -> Result {
2202 let (relay_map, relay_url, _guard1) = run_relay_server().await?;
2203 let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2204 .insecure_skip_relay_cert_verify(true)
2205 .bind()
2206 .await?;
2207 let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2208 .insecure_skip_relay_cert_verify(true)
2209 .alpns(vec![TEST_ALPN.to_vec()])
2210 .bind()
2211 .await?;
2212
2213 let task = tokio::spawn({
2214 let server = server.clone();
2215 async move {
2216 for i in 0..2 {
2217 println!("accept: round {i}");
2218 let Some(conn) = server.accept().await else {
2219 n0_error::bail_any!("Expected an incoming connection");
2220 };
2221 let conn = conn.await.anyerr()?;
2222 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2223 let data = recv.read_to_end(1000).await.anyerr()?;
2224 send.write_all(&data).await.anyerr()?;
2225 send.finish().anyerr()?;
2226 conn.closed().await;
2227 }
2228 Ok::<_, Error>(())
2229 }
2230 });
2231
2232 server.online().await;
2233
2234 let mut addr = server.addr();
2235 println!("round1: {:?}", addr);
2236
2237 addr.addrs
2239 .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2240
2241 let conn = client.connect(addr, TEST_ALPN).await?;
2242 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2243 send.write_all(b"Hello, world!").await.anyerr()?;
2244 send.finish().anyerr()?;
2245 let data = recv.read_to_end(1000).await.anyerr()?;
2246 conn.close(0u32.into(), b"bye!");
2247
2248 assert_eq!(&data, b"Hello, world!");
2249
2250 let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
2252 let new_endpoint = new_relay_map
2253 .get(&new_relay_url)
2254 .expect("missing endpoint")
2255 .clone();
2256 dbg!(&new_relay_map);
2257
2258 let addr_watcher = server.watch_addr();
2259
2260 assert!(
2262 server
2263 .insert_relay(new_relay_url.clone(), new_endpoint.clone())
2264 .await
2265 .is_none()
2266 );
2267 assert!(server.remove_relay(&relay_url).await.is_some());
2269
2270 println!("------- changed ----- ");
2271
2272 let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
2273 let mut stream = addr_watcher.stream();
2274 while let Some(addr) = stream.next().await {
2275 if addr.relay_urls().next() != Some(&relay_url) {
2276 return addr;
2277 }
2278 }
2279 panic!("failed to change relay");
2280 })
2281 .await
2282 .anyerr()?;
2283
2284 println!("round2: {:?}", addr);
2285 assert_eq!(addr.relay_urls().next(), Some(&new_relay_url));
2286
2287 addr.addrs
2289 .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2290
2291 let conn = client.connect(addr, TEST_ALPN).await?;
2292 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2293 send.write_all(b"Hello, world!").await.anyerr()?;
2294 send.finish().anyerr()?;
2295 let data = recv.read_to_end(1000).await.anyerr()?;
2296 conn.close(0u32.into(), b"bye!");
2297
2298 task.await.anyerr()??;
2299
2300 client.close().await;
2301 server.close().await;
2302
2303 assert_eq!(&data, b"Hello, world!");
2304
2305 Ok(())
2306 }
2307
2308 #[tokio::test]
2309 #[traced_test]
2310 async fn endpoint_bidi_send_recv() -> Result {
2311 let disco = MemoryLookup::new();
2312 let ep1 = Endpoint::empty_builder(RelayMode::Disabled)
2313 .address_lookup(disco.clone())
2314 .alpns(vec![TEST_ALPN.to_vec()])
2315 .bind()
2316 .await?;
2317
2318 let ep2 = Endpoint::empty_builder(RelayMode::Disabled)
2319 .address_lookup(disco.clone())
2320 .alpns(vec![TEST_ALPN.to_vec()])
2321 .bind()
2322 .await?;
2323
2324 disco.add_endpoint_info(ep1.addr());
2325 disco.add_endpoint_info(ep2.addr());
2326
2327 let ep1_endpointid = ep1.id();
2328 let ep2_endpointid = ep2.id();
2329 eprintln!("endpoint id 1 {ep1_endpointid}");
2330 eprintln!("endpoint id 2 {ep2_endpointid}");
2331
2332 async fn connect_hello(ep: Endpoint, dst: EndpointId) -> Result {
2333 let conn = ep.connect(dst, TEST_ALPN).await?;
2334 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2335 info!("sending hello");
2336 send.write_all(b"hello").await.anyerr()?;
2337 send.finish().anyerr()?;
2338 info!("receiving world");
2339 let m = recv.read_to_end(100).await.anyerr()?;
2340 assert_eq!(m, b"world");
2341 conn.close(1u8.into(), b"done");
2342 Ok(())
2343 }
2344
2345 async fn accept_world(ep: Endpoint, src: EndpointId) -> Result {
2346 let incoming = ep.accept().await.anyerr()?;
2347 let mut iconn = incoming.accept().anyerr()?;
2348 let alpn = iconn.alpn().await?;
2349 let conn = iconn.await.anyerr()?;
2350 let endpoint_id = conn.remote_id();
2351 assert_eq!(endpoint_id, src);
2352 assert_eq!(alpn, TEST_ALPN);
2353 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2354 info!("receiving hello");
2355 let m = recv.read_to_end(100).await.anyerr()?;
2356 assert_eq!(m, b"hello");
2357 info!("sending hello");
2358 send.write_all(b"world").await.anyerr()?;
2359 send.finish().anyerr()?;
2360 match conn.closed().await {
2361 ConnectionError::ApplicationClosed(closed) => {
2362 assert_eq!(closed.error_code, 1u8.into());
2363 Ok(())
2364 }
2365 _ => panic!("wrong close error"),
2366 }
2367 }
2368
2369 let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_endpointid).instrument(
2370 info_span!(
2371 "p1_accept",
2372 ep1 = %ep1.id().fmt_short(),
2373 dst = %ep2_endpointid.fmt_short(),
2374 ),
2375 ));
2376 let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_endpointid).instrument(
2377 info_span!(
2378 "p2_accept",
2379 ep2 = %ep2.id().fmt_short(),
2380 dst = %ep1_endpointid.fmt_short(),
2381 ),
2382 ));
2383 let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_endpointid).instrument(
2384 info_span!(
2385 "p1_connect",
2386 ep1 = %ep1.id().fmt_short(),
2387 dst = %ep2_endpointid.fmt_short(),
2388 ),
2389 ));
2390 let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_endpointid).instrument(
2391 info_span!(
2392 "p2_connect",
2393 ep2 = %ep2.id().fmt_short(),
2394 dst = %ep1_endpointid.fmt_short(),
2395 ),
2396 ));
2397
2398 p1_accept.await.anyerr()??;
2399 p2_accept.await.anyerr()??;
2400 p1_connect.await.anyerr()??;
2401 p2_connect.await.anyerr()??;
2402
2403 Ok(())
2404 }
2405
2406 #[tokio::test]
2407 #[traced_test]
2408 async fn test_direct_addresses_no_qad_relay() -> Result {
2409 let (relay_map, _, _guard) = run_relay_server_with(false).await.unwrap();
2410
2411 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2412 .alpns(vec![TEST_ALPN.to_vec()])
2413 .insecure_skip_relay_cert_verify(true)
2414 .bind()
2415 .await?;
2416
2417 assert!(ep.addr().ip_addrs().count() > 0);
2418
2419 Ok(())
2420 }
2421
2422 #[cfg_attr(target_os = "windows", ignore = "flaky")]
2423 #[tokio::test]
2424 #[traced_test]
2425 async fn graceful_close() -> Result {
2426 let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2427 let server = Endpoint::empty_builder(RelayMode::Disabled)
2428 .alpns(vec![TEST_ALPN.to_vec()])
2429 .bind()
2430 .await?;
2431 let server_addr = server.addr();
2432 let server_task = tokio::spawn(async move {
2433 let incoming = server.accept().await.anyerr()?;
2434 let conn = incoming.await.anyerr()?;
2435 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2436 let msg = recv.read_to_end(1_000).await.anyerr()?;
2437 send.write_all(&msg).await.anyerr()?;
2438 send.finish().anyerr()?;
2439 let close_reason = conn.closed().await;
2440 Ok::<_, Error>(close_reason)
2441 });
2442
2443 let conn = client.connect(server_addr, TEST_ALPN).await?;
2444 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2445 send.write_all(b"Hello, world!").await.anyerr()?;
2446 send.finish().anyerr()?;
2447 recv.read_to_end(1_000).await.anyerr()?;
2448 conn.close(42u32.into(), b"thanks, bye!");
2449 client.close().await;
2450
2451 let close_err = server_task.await.anyerr()??;
2452 let ConnectionError::ApplicationClosed(app_close) = close_err else {
2453 panic!("Unexpected close reason: {close_err:?}");
2454 };
2455
2456 assert_eq!(app_close.error_code, 42u32.into());
2457 assert_eq!(app_close.reason.as_ref(), b"thanks, bye!");
2458
2459 Ok(())
2460 }
2461
2462 #[cfg(feature = "metrics")]
2463 #[tokio::test]
2464 #[traced_test]
2465 async fn metrics_smoke() -> Result {
2466 use iroh_metrics::Registry;
2467
2468 let secret_key = SecretKey::from_bytes(&[0u8; 32]);
2469 let client = Endpoint::empty_builder(RelayMode::Disabled)
2470 .secret_key(secret_key)
2471 .bind()
2472 .await?;
2473 let secret_key = SecretKey::from_bytes(&[1u8; 32]);
2474 let server = Endpoint::empty_builder(RelayMode::Disabled)
2475 .secret_key(secret_key)
2476 .alpns(vec![TEST_ALPN.to_vec()])
2477 .bind()
2478 .await?;
2479 let server_addr = server.addr();
2480 let server_task = tokio::task::spawn(async move {
2481 let conn = server.accept().await.anyerr()?.await.anyerr()?;
2482 let mut uni = conn.accept_uni().await.anyerr()?;
2483 uni.read_to_end(10).await.anyerr()?;
2484 drop(conn);
2485 Ok::<_, Error>(server)
2486 });
2487 let conn = client.connect(server_addr, TEST_ALPN).await?;
2488 let mut uni = conn.open_uni().await.anyerr()?;
2489 uni.write_all(b"helloworld").await.anyerr()?;
2490 uni.finish().anyerr()?;
2491 conn.closed().await;
2492 drop(conn);
2493 let server = server_task.await.anyerr()??;
2494
2495 let m = client.metrics();
2496 assert!(m.socket.recv_datagrams.get() > 0);
2501
2502 let m = server.metrics();
2503 assert!(m.socket.recv_datagrams.get() > 0);
2508
2509 fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) {
2511 let id = endpoint.id().fmt_short();
2512 let sub_registry = registry.sub_registry_with_label("id", id.to_string());
2513 sub_registry.register_all(endpoint.metrics());
2514 }
2515 let mut registry = Registry::default();
2516 register_endpoint(&mut registry, &client);
2517 register_endpoint(&mut registry, &server);
2518 Ok(())
2522 }
2523
2524 async fn alpn_connection_test(
2527 accept_alpns: Vec<Vec<u8>>,
2528 primary_connect_alpn: &[u8],
2529 secondary_connect_alpns: Vec<Vec<u8>>,
2530 ) -> Result<Vec<u8>> {
2531 let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2532 let server = Endpoint::empty_builder(RelayMode::Disabled)
2533 .alpns(accept_alpns)
2534 .bind()
2535 .await?;
2536 let server_addr = server.addr();
2537 let server_task = tokio::spawn({
2538 let server = server.clone();
2539 async move {
2540 let incoming = server.accept().await.anyerr()?;
2541 let conn = incoming.await.anyerr()?;
2542 conn.close(0u32.into(), b"bye!");
2543 n0_error::Ok(conn.alpn().to_vec())
2544 }
2545 });
2546
2547 let conn = client
2548 .connect_with_opts(
2549 server_addr,
2550 primary_connect_alpn,
2551 ConnectOptions::new().with_additional_alpns(secondary_connect_alpns),
2552 )
2553 .await?;
2554 let conn = conn.await.anyerr()?;
2555 let client_alpn = conn.alpn();
2556 conn.closed().await;
2557 client.close().await;
2558 server.close().await;
2559
2560 let server_alpn = server_task.await.anyerr()??;
2561
2562 assert_eq!(client_alpn, server_alpn);
2563
2564 Ok(server_alpn.to_vec())
2565 }
2566
2567 #[tokio::test]
2568 #[traced_test]
2569 async fn connect_multiple_alpn_negotiated() -> Result {
2570 const ALPN_ONE: &[u8] = b"alpn/1";
2571 const ALPN_TWO: &[u8] = b"alpn/2";
2572
2573 assert_eq!(
2574 alpn_connection_test(
2575 vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2577 ALPN_TWO,
2578 vec![ALPN_ONE.to_vec()],
2579 )
2580 .await?,
2581 ALPN_TWO.to_vec(),
2582 "accept side prefers version 2 over 1"
2583 );
2584
2585 assert_eq!(
2586 alpn_connection_test(
2587 vec![ALPN_ONE.to_vec()],
2589 ALPN_TWO,
2590 vec![ALPN_ONE.to_vec()],
2591 )
2592 .await?,
2593 ALPN_ONE.to_vec(),
2594 "accept side only supports the old version"
2595 );
2596
2597 assert_eq!(
2598 alpn_connection_test(
2599 vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2600 ALPN_ONE,
2601 vec![ALPN_TWO.to_vec()],
2602 )
2603 .await?,
2604 ALPN_TWO.to_vec(),
2605 "connect side ALPN order doesn't matter"
2606 );
2607
2608 assert_eq!(
2609 alpn_connection_test(vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()], ALPN_ONE, vec![],)
2610 .await?,
2611 ALPN_ONE.to_vec(),
2612 "connect side only supports the old version"
2613 );
2614
2615 Ok(())
2616 }
2617
2618 #[tokio::test]
2619 #[traced_test]
2620 async fn watch_net_report() -> Result {
2621 let endpoint = Endpoint::empty_builder(RelayMode::Staging).bind().await?;
2622
2623 endpoint.net_report().updated().await.anyerr()?;
2625
2626 Ok(())
2627 }
2628
2629 #[tokio::test]
2635 #[traced_test]
2636 async fn connect_multi_time() -> Result {
2637 let n = 32;
2638
2639 const NOOP_ALPN: &[u8] = b"noop";
2640
2641 #[derive(Debug, Clone)]
2642 struct Noop;
2643
2644 impl ProtocolHandler for Noop {
2645 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
2646 connection.closed().await;
2647 Ok(())
2648 }
2649 }
2650
2651 async fn noop_server() -> Result<(Router, EndpointAddr)> {
2652 let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2653 .bind()
2654 .await
2655 .anyerr()?;
2656 let addr = endpoint.addr();
2657 let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
2658 Ok((router, addr))
2659 }
2660
2661 let routers = stream::iter(0..n)
2662 .map(|_| noop_server())
2663 .buffered_unordered(32)
2664 .collect::<Vec<_>>()
2665 .await
2666 .into_iter()
2667 .collect::<Result<Vec<_>, _>>()
2668 .anyerr()?;
2669
2670 let addrs = routers
2671 .iter()
2672 .map(|(_, addr)| addr.clone())
2673 .collect::<Vec<_>>();
2674 let ids = addrs.iter().map(|addr| addr.id).collect::<Vec<_>>();
2675 let address_lookup = MemoryLookup::from_endpoint_info(addrs);
2676 let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2677 .address_lookup(address_lookup)
2678 .bind()
2679 .await
2680 .anyerr()?;
2681 endpoint.addr();
2685 let t0 = Instant::now();
2686 for id in &ids {
2687 let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2688 conn.close(0u32.into(), b"done");
2689 }
2690 let dt0 = t0.elapsed().as_secs_f64();
2691 let t1 = Instant::now();
2692 for id in &ids {
2693 let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2694 conn.close(0u32.into(), b"done");
2695 }
2696 let dt1 = t1.elapsed().as_secs_f64();
2697
2698 assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
2699 Ok(())
2700 }
2701
2702 #[tokio::test]
2703 async fn test_custom_relay() -> Result {
2704 let _ep = Endpoint::empty_builder(RelayMode::custom([RelayUrl::from_str(
2705 "https://use1-1.relay.n0.iroh-canary.iroh.link.",
2706 )?]))
2707 .bind()
2708 .await?;
2709
2710 let relays = RelayMap::try_from_iter([
2711 "https://use1-1.relay.n0.iroh.iroh.link/",
2712 "https://euc1-1.relay.n0.iroh.iroh.link/",
2713 ])?;
2714 let _ep = Endpoint::empty_builder(RelayMode::Custom(relays))
2715 .bind()
2716 .await?;
2717
2718 Ok(())
2719 }
2720
2721 #[tokio::test]
2723 #[traced_test]
2724 async fn test_bind_addr_clear() -> Result {
2725 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2726 .clear_ip_transports()
2727 .bind_addr((Ipv4Addr::LOCALHOST, 0))?
2728 .bind()
2729 .await?;
2730 let bound_sockets = ep.bound_sockets();
2731 assert_eq!(bound_sockets.len(), 1);
2732 assert_eq!(bound_sockets[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2733 ep.close().await;
2734 Ok(())
2735 }
2736
2737 #[tokio::test]
2742 #[traced_test]
2743 async fn test_bind_addr_no_clear() -> Result {
2744 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2745 .bind_addr((Ipv4Addr::LOCALHOST, 0))?
2746 .bind()
2747 .await?;
2748 let bound_sockets = ep.bound_sockets();
2749 assert_eq!(bound_sockets.len(), 2);
2750 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
2751 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2752 assert!(
2754 bound_sockets
2755 .iter()
2756 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2757 );
2758 ep.close().await;
2759 Ok(())
2760 }
2761
2762 #[tokio::test]
2767 #[traced_test]
2768 async fn test_bind_addr_default() -> Result {
2769 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2770 .bind_addr_with_opts(
2771 (Ipv4Addr::LOCALHOST, 0),
2772 BindOpts::default().set_is_default_route(true),
2773 )?
2774 .bind()
2775 .await?;
2776 let bound_sockets = ep.bound_sockets();
2777 assert_eq!(bound_sockets.len(), 2);
2778 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
2779 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2780 assert!(
2781 bound_sockets
2782 .iter()
2783 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2784 );
2785 ep.close().await;
2786 drop(ep);
2787
2788 Ok(())
2789 }
2790
2791 #[tokio::test]
2796 #[traced_test]
2797 async fn test_bind_addr_nonzero_prefix() -> Result {
2798 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2799 .bind_addr_with_opts(
2800 (Ipv4Addr::LOCALHOST, 0),
2801 BindOpts::default().set_prefix_len(32),
2802 )?
2803 .bind()
2804 .await?;
2805 let bound_sockets = ep.bound_sockets();
2806 assert_eq!(bound_sockets.len(), 3);
2807 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 2);
2808 assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2809 assert!(
2811 bound_sockets
2812 .iter()
2813 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
2814 );
2815 assert!(
2817 bound_sockets
2818 .iter()
2819 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2820 );
2821 ep.close().await;
2822 Ok(())
2823 }
2824
2825 #[tokio::test]
2829 #[traced_test]
2830 async fn test_bind_addr_badport() -> Result {
2831 let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2832 let port = socket.local_addr()?.port();
2833
2834 let res = Endpoint::empty_builder(RelayMode::Disabled)
2835 .clear_ip_transports()
2836 .bind_addr((Ipv4Addr::LOCALHOST, port))?
2837 .bind()
2838 .await;
2839
2840 assert!(matches!(
2841 res,
2842 Err(BindError::Sockets {
2843 source: io_error,
2844 ..
2845 })
2846 if io_error.kind() == io::ErrorKind::AddrInUse
2847 ));
2848 Ok(())
2849 }
2850
2851 #[tokio::test]
2855 #[traced_test]
2856 async fn test_bind_addr_badport_notrequired() -> Result {
2857 let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2858 let port = socket.local_addr()?.port();
2859
2860 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2861 .bind_addr_with_opts(
2862 (Ipv4Addr::LOCALHOST, port),
2863 BindOpts::default()
2864 .set_prefix_len(32)
2865 .set_is_required(false),
2866 )?
2867 .bind()
2868 .await?;
2869 let bound_sockets = ep.bound_sockets();
2870 assert_eq!(bound_sockets.len(), 2);
2872 assert!(
2874 !bound_sockets
2875 .iter()
2876 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2877 );
2878 Ok(())
2879 }
2880
2881 #[tokio::test]
2885 #[traced_test]
2886 async fn test_bind_addr_badport_default_notrequired() -> Result {
2887 let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2888 let port = socket.local_addr()?.port();
2889
2890 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2891 .bind_addr_with_opts(
2892 (Ipv4Addr::LOCALHOST, port),
2893 BindOpts::default().set_is_required(false),
2894 )?
2895 .bind()
2896 .await?;
2897 let bound_sockets = ep.bound_sockets();
2898 assert_eq!(bound_sockets.len(), 1);
2901 assert!(bound_sockets[0].is_ipv6());
2902 Ok(())
2903 }
2904
2905 #[tokio::test]
2909 #[traced_test]
2910 async fn test_bind_addr_badport_notrequired_no_other_transports() -> Result {
2911 let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2912 let port = socket.local_addr()?.port();
2913
2914 let res = Endpoint::empty_builder(RelayMode::Disabled)
2915 .clear_ip_transports()
2916 .bind_addr_with_opts(
2917 (Ipv4Addr::LOCALHOST, port),
2918 BindOpts::default().set_is_required(false),
2919 )?
2920 .bind()
2921 .await;
2922
2923 assert!(matches!(
2924 res,
2925 Err(BindError::CreateQuicEndpoint {
2926 source: io_error,
2927 ..
2928 })
2929 if io_error.kind() == io::ErrorKind::Other && io_error.to_string() == "no valid address available"
2930 ));
2931 Ok(())
2932 }
2933
2934 #[tokio::test]
2936 #[traced_test]
2937 async fn test_bind_addr_prefix_len_0_not_default() -> Result {
2938 let ep = Endpoint::empty_builder(RelayMode::Disabled)
2939 .bind_addr_with_opts(
2940 (Ipv4Addr::LOCALHOST, 0),
2941 BindOpts::default().set_is_default_route(false),
2942 )?
2943 .bind()
2944 .await?;
2945 let bound_sockets = ep.bound_sockets();
2946 assert_eq!(bound_sockets.len(), 3);
2949 assert!(
2950 bound_sockets
2951 .iter()
2952 .any(|x| x.ip() == IpAddr::V6(Ipv6Addr::UNSPECIFIED))
2953 );
2954 assert!(
2955 bound_sockets
2956 .iter()
2957 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
2958 );
2959 assert!(
2960 bound_sockets
2961 .iter()
2962 .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2963 );
2964 Ok(())
2965 }
2966
2967 #[ignore = "flaky"]
2968 #[tokio::test]
2969 #[traced_test]
2970 async fn connect_via_relay_becomes_direct_and_sends_direct() -> Result {
2971 let (relay_map, relay_url, _relay_server_guard) = run_relay_server().await?;
2972 let qlog = Arc::new(QlogFileGroup::from_env(
2973 "connect_via_relay_becomes_direct_and_sends_direct",
2974 ));
2975 let transfer_size = 1_000_000;
2976
2977 async fn collect_path_infos(conn: Connection) -> BTreeMap<TransportAddr, PathInfo> {
2978 let mut path_infos = BTreeMap::new();
2979 let mut paths = conn.paths().stream();
2980 while let Some(path_list) = paths.next().await {
2981 for path in path_list {
2982 path_infos.insert(path.remote_addr().clone(), path);
2983 }
2984 }
2985 path_infos
2986 }
2987
2988 let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2989 .insecure_skip_relay_cert_verify(true)
2990 .transport_config(qlog.create("client")?)
2991 .bind()
2992 .await?;
2993 let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2994 .insecure_skip_relay_cert_verify(true)
2995 .transport_config(qlog.create("server")?)
2996 .alpns(vec![TEST_ALPN.to_vec()])
2997 .bind()
2998 .await?;
2999 let server_addr = EndpointAddr::new(server.id()).with_relay_url(relay_url);
3000 let server_task = tokio::spawn(async move {
3001 let incoming = server.accept().await.anyerr()?;
3002 let conn = incoming.await.anyerr()?;
3003 let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3004 let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
3005 let msg = recv.read_to_end(transfer_size).await.anyerr()?;
3006 send.write_all(&msg).await.anyerr()?;
3007 send.finish().anyerr()?;
3008 conn.closed().await;
3009 let stats = stats_task.await.std_context("server stats task failed")?;
3010 Ok::<_, Error>(stats)
3011 });
3012
3013 let conn = client.connect(server_addr, TEST_ALPN).await?;
3014 let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3015 let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
3016 send.write_all(&vec![42u8; transfer_size]).await.anyerr()?;
3017 send.finish().anyerr()?;
3018 recv.read_to_end(transfer_size).await.anyerr()?;
3019 conn.close(0u32.into(), b"thanks, bye!");
3020 client.close().await;
3021 let client_stats = stats_task.await.std_context("client stats task failed")?;
3022 let server_stats = server_task.await.anyerr()??;
3023
3024 info!("client stats: {client_stats:#?}");
3025 info!("server stats: {server_stats:#?}");
3026
3027 let client_total_relay_tx = client_stats
3028 .values()
3029 .filter(|p| p.is_relay())
3030 .map(|p| p.stats().udp_tx.bytes)
3031 .sum::<u64>();
3032 let client_total_relay_rx = client_stats
3033 .values()
3034 .filter(|p| p.is_relay())
3035 .map(|p| p.stats().udp_rx.bytes)
3036 .sum::<u64>();
3037 let server_total_relay_tx = server_stats
3038 .values()
3039 .filter(|p| p.is_relay())
3040 .map(|p| p.stats().udp_tx.bytes)
3041 .sum::<u64>();
3042 let server_total_relay_rx = server_stats
3043 .values()
3044 .filter(|p| p.is_relay())
3045 .map(|p| p.stats().udp_rx.bytes)
3046 .sum::<u64>();
3047
3048 info!(?client_total_relay_tx, "total");
3049 info!(?client_total_relay_rx, "total");
3050 info!(?server_total_relay_tx, "total");
3051 info!(?server_total_relay_rx, "total");
3052
3053 assert!(client_total_relay_tx < transfer_size as u64 / 2);
3055 assert!(client_total_relay_rx < transfer_size as u64 / 2);
3056 assert!(server_total_relay_tx < transfer_size as u64 / 2);
3057 assert!(server_total_relay_rx < transfer_size as u64 / 2);
3058
3059 Ok(())
3060 }
3061}