iroh_net/endpoint.rs
1//! The [`Endpoint`] allows establishing connections to other iroh-net nodes.
2//!
3//! The [`Endpoint`] is the main API interface to manage a local iroh-net node. It allows
4//! connecting to and accepting connections from other nodes. See the [module docs] for
5//! more details on how iroh-net connections work.
6//!
7//! The main items in this module are:
8//!
9//! - [`Endpoint`] to establish iroh-net connections with other nodes.
10//! - [`Builder`] to create an [`Endpoint`].
11//!
12//! [module docs]: crate
13
14use std::{
15 any::Any,
16 future::{Future, IntoFuture},
17 net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
18 pin::Pin,
19 sync::Arc,
20 task::Poll,
21 time::Duration,
22};
23
24use anyhow::{anyhow, bail, Context, Result};
25use derive_more::Debug;
26use futures_lite::{Stream, StreamExt};
27use pin_project::pin_project;
28use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
29use tracing::{debug, instrument, trace, warn};
30use url::Url;
31
32use crate::{
33 discovery::{
34 dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryTask,
35 },
36 dns::{default_resolver, DnsResolver},
37 key::{PublicKey, SecretKey},
38 magicsock::{self, Handle, QuicMappedAddr},
39 relay::{force_staging_infra, RelayMode, RelayUrl},
40 tls, NodeId,
41};
42
43mod rtt_actor;
44
45pub use bytes::Bytes;
46pub use iroh_base::node_addr::{AddrInfo, NodeAddr};
47// Missing still: SendDatagram and ConnectionClose::frame_type's Type.
48pub use quinn::{
49 AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, Connection,
50 ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni,
51 ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError,
52 SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt,
53 WeakConnectionHandle, WriteError, ZeroRttAccepted,
54};
55pub use quinn_proto::{
56 congestion::{Controller, ControllerFactory},
57 crypto::{
58 AeadKey, CryptoError, ExportKeyingMaterialError, HandshakeTokenKey,
59 ServerConfig as CryptoServerConfig, UnsupportedVersion,
60 },
61 FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written,
62};
63
64use self::rtt_actor::RttMessage;
65pub use super::magicsock::{
66 ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddr, DirectAddrInfo, DirectAddrType,
67 DirectAddrsStream, RemoteInfo, Source,
68};
69
70/// The delay to fall back to discovery when direct addresses fail.
71///
72/// When a connection is attempted with a [`NodeAddr`] containing direct addresses the
73/// [`Endpoint`] assumes one of those addresses probably works. If after this delay there
74/// is still no connection the configured [`Discovery`] will be used however.
75const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500);
76
77type DiscoveryBuilder = Box<dyn FnOnce(&SecretKey) -> Option<Box<dyn Discovery>> + Send + Sync>;
78
79/// Builder for [`Endpoint`].
80///
81/// By default the endpoint will generate a new random [`SecretKey`], which will result in a
82/// new [`NodeId`].
83///
84/// To create the [`Endpoint`] call [`Builder::bind`].
85#[derive(Debug)]
86pub struct Builder {
87 secret_key: Option<SecretKey>,
88 relay_mode: RelayMode,
89 alpn_protocols: Vec<Vec<u8>>,
90 transport_config: Option<quinn::TransportConfig>,
91 keylog: bool,
92 #[debug(skip)]
93 discovery: Vec<DiscoveryBuilder>,
94 proxy_url: Option<Url>,
95 /// List of known nodes. See [`Builder::known_nodes`].
96 node_map: Option<Vec<NodeAddr>>,
97 dns_resolver: Option<DnsResolver>,
98 #[cfg(any(test, feature = "test-utils"))]
99 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
100 insecure_skip_relay_cert_verify: bool,
101 addr_v4: Option<SocketAddrV4>,
102 addr_v6: Option<SocketAddrV6>,
103}
104
105impl Default for Builder {
106 fn default() -> Self {
107 Self {
108 secret_key: Default::default(),
109 relay_mode: default_relay_mode(),
110 alpn_protocols: Default::default(),
111 transport_config: Default::default(),
112 keylog: Default::default(),
113 discovery: Default::default(),
114 proxy_url: None,
115 node_map: None,
116 dns_resolver: None,
117 #[cfg(any(test, feature = "test-utils"))]
118 insecure_skip_relay_cert_verify: false,
119 addr_v4: None,
120 addr_v6: None,
121 }
122 }
123}
124
125impl Builder {
126 // The ordering of public methods is reflected directly in the documentation. This is
127 // roughly ordered by what is most commonly needed by users.
128
129 // # The final constructor that everyone needs.
130
131 /// Binds the magic endpoint.
132 pub async fn bind(self) -> Result<Endpoint> {
133 let relay_map = self.relay_mode.relay_map();
134 let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate);
135 let static_config = StaticConfig {
136 transport_config: Arc::new(self.transport_config.unwrap_or_default()),
137 keylog: self.keylog,
138 secret_key: secret_key.clone(),
139 };
140 let dns_resolver = self
141 .dns_resolver
142 .unwrap_or_else(|| default_resolver().clone());
143 let discovery = self
144 .discovery
145 .into_iter()
146 .filter_map(|f| f(&secret_key))
147 .collect::<Vec<_>>();
148 let discovery: Option<Box<dyn Discovery>> = match discovery.len() {
149 0 => None,
150 1 => Some(discovery.into_iter().next().unwrap()),
151 _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))),
152 };
153 let msock_opts = magicsock::Options {
154 addr_v4: self.addr_v4,
155 addr_v6: self.addr_v6,
156 secret_key,
157 relay_map,
158 node_map: self.node_map,
159 discovery,
160 proxy_url: self.proxy_url,
161 dns_resolver,
162 #[cfg(any(test, feature = "test-utils"))]
163 insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
164 };
165 Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await
166 }
167
168 // # The very common methods everyone basically needs.
169
170 /// Sets the IPv4 bind address.
171 ///
172 /// Setting the port to `0` will use a random port.
173 /// If the port specified is already in use, it will fallback to choosing a random port.
174 ///
175 /// By default will use `0.0.0.0:0` to bind to.
176 pub fn bind_addr_v4(mut self, addr: SocketAddrV4) -> Self {
177 self.addr_v4.replace(addr);
178 self
179 }
180
181 /// Sets the IPv6 bind address.
182 ///
183 /// Setting the port to `0` will use a random port.
184 /// If the port specified is already in use, it will fallback to choosing a random port.
185 ///
186 /// By default will use `[::]:0` to bind to.
187 pub fn bind_addr_v6(mut self, addr: SocketAddrV6) -> Self {
188 self.addr_v6.replace(addr);
189 self
190 }
191
192 /// Sets a secret key to authenticate with other peers.
193 ///
194 /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus
195 /// also its [`NodeId`]
196 ///
197 /// If not set, a new secret key will be generated.
198 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
199 self.secret_key = Some(secret_key);
200 self
201 }
202
203 /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections.
204 ///
205 /// Not setting this will still allow creating connections, but to accept incoming
206 /// connections the [ALPN] must be set.
207 ///
208 /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
209 pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
210 self.alpn_protocols = alpn_protocols;
211 self
212 }
213
214 // # Methods for common customisation items.
215
216 /// Sets the relay servers to assist in establishing connectivity.
217 ///
218 /// Relay servers are used to establish initial connection with another iroh-net node.
219 /// They also perform various functions related to hole punching, see the [crate docs]
220 /// for more details.
221 ///
222 /// By default the [number 0] relay servers are used, see [`RelayMode::Default`].
223 ///
224 /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one
225 /// configured relay node. If an invalid RelayMap is provided [`bind`]
226 /// will result in an error.
227 ///
228 /// [`bind`]: Builder::bind
229 /// [crate docs]: crate
230 /// [number 0]: https://n0.computer
231 pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
232 self.relay_mode = relay_mode;
233 self
234 }
235
236 /// Removes all discovery services from the builder.
237 pub fn clear_discovery(mut self) -> Self {
238 self.discovery.clear();
239 self
240 }
241
242 /// Optionally sets a discovery mechanism for this endpoint.
243 ///
244 /// If you want to combine multiple discovery services, you can use
245 /// [`Builder::add_discovery`] instead. This will internally create a
246 /// [`crate::discovery::ConcurrentDiscovery`].
247 ///
248 /// If no discovery service is set, connecting to a node without providing its
249 /// direct addresses or relay URLs will fail.
250 ///
251 /// See the documentation of the [`Discovery`] trait for details.
252 pub fn discovery(mut self, discovery: Box<dyn Discovery>) -> Self {
253 self.discovery.clear();
254 self.discovery.push(Box::new(move |_| Some(discovery)));
255 self
256 }
257
258 /// Adds a discovery mechanism for this endpoint.
259 ///
260 /// The function `discovery`
261 /// will be called on endpoint creation with the configured secret key of
262 /// the endpoint. Discovery services that need to publish information need
263 /// to use this secret key to sign the information.
264 ///
265 /// If you add multiple discovery services, they will be combined using a
266 /// [`crate::discovery::ConcurrentDiscovery`].
267 ///
268 /// If no discovery service is set, connecting to a node without providing its
269 /// direct addresses or relay URLs will fail.
270 ///
271 /// To clear all discovery services, use [`Builder::clear_discovery`].
272 ///
273 /// See the documentation of the [`Discovery`] trait for details.
274 pub fn add_discovery<F, D>(mut self, discovery: F) -> Self
275 where
276 F: FnOnce(&SecretKey) -> Option<D> + Send + Sync + 'static,
277 D: Discovery + 'static,
278 {
279 let discovery: DiscoveryBuilder =
280 Box::new(move |secret_key| discovery(secret_key).map(|x| Box::new(x) as _));
281 self.discovery.push(discovery);
282 self
283 }
284
285 /// Configures the endpoint to use the default n0 DNS discovery service.
286 ///
287 /// The default discovery service publishes to and resolves from the
288 /// n0.computer dns server `iroh.link`.
289 ///
290 /// This is equivalent to adding both a [`crate::discovery::pkarr::PkarrPublisher`]
291 /// and a [`crate::discovery::dns::DnsDiscovery`], both configured to use the
292 /// n0.computer dns server.
293 ///
294 /// This will by default use [`N0_DNS_PKARR_RELAY_PROD`].
295 /// When in tests, or when the `test-utils` feature is enabled, this will use the
296 /// [`N0_DNS_PKARR_RELAY_STAGING`].
297 ///
298 /// [`N0_DNS_PKARR_RELAY_PROD`]: crate::discovery::pkarr::N0_DNS_PKARR_RELAY_PROD
299 /// [`N0_DNS_PKARR_RELAY_STAGING`]: crate::discovery::pkarr::N0_DNS_PKARR_RELAY_STAGING
300 pub fn discovery_n0(mut self) -> Self {
301 self.discovery.push(Box::new(|secret_key| {
302 Some(Box::new(PkarrPublisher::n0_dns(secret_key.clone())))
303 }));
304 self.discovery
305 .push(Box::new(|_| Some(Box::new(DnsDiscovery::n0_dns()))));
306 self
307 }
308
309 #[cfg(feature = "discovery-pkarr-dht")]
310 /// Configures the endpoint to also use the mainline DHT with default settings.
311 ///
312 /// This is equivalent to adding a [`crate::discovery::pkarr::dht::DhtDiscovery`]
313 /// with default settings. Note that DhtDiscovery has various more advanced
314 /// configuration options. If you need any of those, you should manually
315 /// create a DhtDiscovery and add it with [`Builder::add_discovery`].
316 pub fn discovery_dht(mut self) -> Self {
317 use crate::discovery::pkarr::dht::DhtDiscovery;
318 self.discovery.push(Box::new(|secret_key| {
319 Some(Box::new(
320 DhtDiscovery::builder()
321 .secret_key(secret_key.clone())
322 .build()
323 .unwrap(),
324 ))
325 }));
326 self
327 }
328
329 #[cfg(feature = "discovery-local-network")]
330 /// Configures the endpoint to also use local network discovery.
331 ///
332 /// This is equivalent to adding a [`crate::discovery::local_swarm_discovery::LocalSwarmDiscovery`]
333 /// with default settings. Note that LocalSwarmDiscovery has various more advanced
334 /// configuration options. If you need any of those, you should manually
335 /// create a LocalSwarmDiscovery and add it with [`Builder::add_discovery`].
336 pub fn discovery_local_network(mut self) -> Self {
337 use crate::discovery::local_swarm_discovery::LocalSwarmDiscovery;
338 self.discovery.push(Box::new(|secret_key| {
339 LocalSwarmDiscovery::new(secret_key.public())
340 .map(|x| Box::new(x) as _)
341 .ok()
342 }));
343 self
344 }
345
346 /// Optionally set a list of known nodes.
347 pub fn known_nodes(mut self, nodes: Vec<NodeAddr>) -> Self {
348 self.node_map = Some(nodes);
349 self
350 }
351
352 // # Methods for more specialist customisation.
353
354 /// Sets a custom [`quinn::TransportConfig`] for this endpoint.
355 ///
356 /// The transport config contains parameters governing the QUIC state machine.
357 ///
358 /// If unset, the default config is used. Default values should be suitable for most
359 /// internet applications. Applications protocols which forbid remotely-initiated
360 /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to
361 /// zero.
362 pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self {
363 self.transport_config = Some(transport_config);
364 self
365 }
366
367 /// Optionally sets a custom DNS resolver to use for this endpoint.
368 ///
369 /// The DNS resolver is used to resolve relay hostnames, and node addresses if
370 /// [`crate::discovery::dns::DnsDiscovery`] is configured.
371 ///
372 /// By default, all endpoints share a DNS resolver, which is configured to use the
373 /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`]
374 /// here to use a differently configured DNS resolver for this endpoint.
375 pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
376 self.dns_resolver = Some(dns_resolver);
377 self
378 }
379
380 /// Sets an explicit proxy url to proxy all HTTP(S) traffic through.
381 pub fn proxy_url(mut self, url: Url) -> Self {
382 self.proxy_url.replace(url);
383 self
384 }
385
386 /// Sets the proxy url from the environment, in this order:
387 ///
388 /// - `HTTP_PROXY`
389 /// - `http_proxy`
390 /// - `HTTPS_PROXY`
391 /// - `https_proxy`
392 pub fn proxy_from_env(mut self) -> Self {
393 self.proxy_url = proxy_url_from_env();
394 self
395 }
396
397 /// Enables saving the TLS pre-master key for connections.
398 ///
399 /// This key should normally remain secret but can be useful to debug networking issues
400 /// by decrypting captured traffic.
401 ///
402 /// If *keylog* is `true` then setting the `SSLKEYLOGFILE` environment variable to a
403 /// filename will result in this file being used to log the TLS pre-master keys.
404 pub fn keylog(mut self, keylog: bool) -> Self {
405 self.keylog = keylog;
406 self
407 }
408
409 /// Skip verification of SSL certificates from relay servers
410 ///
411 /// May only be used in tests.
412 #[cfg(any(test, feature = "test-utils"))]
413 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
414 pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
415 self.insecure_skip_relay_cert_verify = skip_verify;
416 self
417 }
418}
419
420/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
421#[derive(Debug)]
422struct StaticConfig {
423 secret_key: SecretKey,
424 transport_config: Arc<quinn::TransportConfig>,
425 keylog: bool,
426}
427
428impl StaticConfig {
429 /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols.
430 fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> Result<ServerConfig> {
431 let server_config = make_server_config(
432 &self.secret_key,
433 alpn_protocols,
434 self.transport_config.clone(),
435 self.keylog,
436 )?;
437 Ok(server_config)
438 }
439}
440
441/// Creates a [`ServerConfig`] with the given secret key and limits.
442// This return type can not longer be used anywhere in our public API. It is however still
443// used by iroh::node::Node (or rather iroh::node::Builder) to create a plain Quinn
444// endpoint.
445pub fn make_server_config(
446 secret_key: &SecretKey,
447 alpn_protocols: Vec<Vec<u8>>,
448 transport_config: Arc<TransportConfig>,
449 keylog: bool,
450) -> Result<ServerConfig> {
451 let quic_server_config = tls::make_server_config(secret_key, alpn_protocols, keylog)?;
452 let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
453 server_config.transport_config(transport_config);
454
455 Ok(server_config)
456}
457
458/// Controls an iroh-net node, establishing connections with other nodes.
459///
460/// This is the main API interface to create connections to, and accept connections from
461/// other iroh-net nodes. The connections are peer-to-peer and encrypted, a Relay server is
462/// used to make the connections reliable. See the [crate docs] for a more detailed
463/// overview of iroh-net.
464///
465/// It is recommended to only create a single instance per application. This ensures all
466/// the connections made share the same peer-to-peer connections to other iroh-net nodes,
467/// while still remaining independent connections. This will result in more optimal network
468/// behaviour.
469///
470/// New connections are typically created using the [`Endpoint::connect`] and
471/// [`Endpoint::accept`] methods. Once established, the [`Connection`] gives access to most
472/// [QUIC] features. Individual streams to send data to the peer are created using the
473/// [`Connection::open_bi`], [`Connection::accept_bi`], [`Connection::open_uni`] and
474/// [`Connection::open_bi`] functions.
475///
476/// Note that due to the light-weight properties of streams a stream will only be accepted
477/// once the initiating peer has sent some data on it.
478///
479/// [QUIC]: https://quicwg.org
480#[derive(Clone, Debug)]
481pub struct Endpoint {
482 msock: Handle,
483 endpoint: quinn::Endpoint,
484 rtt_actor: Arc<rtt_actor::RttHandle>,
485 cancel_token: CancellationToken,
486 static_config: Arc<StaticConfig>,
487}
488
489impl Endpoint {
490 // The ordering of public methods is reflected directly in the documentation. This is
491 // roughly ordered by what is most commonly needed by users, but grouped in similar
492 // items.
493
494 // # Methods relating to construction.
495
496 /// Returns the builder for an [`Endpoint`], with a production configuration.
497 pub fn builder() -> Builder {
498 Builder::default()
499 }
500
501 /// Creates a quinn endpoint backed by a magicsock.
502 ///
503 /// This is for internal use, the public interface is the [`Builder`] obtained from
504 /// [Self::builder]. See the methods on the builder for documentation of the parameters.
505 #[instrument("ep", skip_all, fields(me = %static_config.secret_key.public().fmt_short()))]
506 async fn bind(
507 static_config: StaticConfig,
508 msock_opts: magicsock::Options,
509 initial_alpns: Vec<Vec<u8>>,
510 ) -> Result<Self> {
511 let msock = magicsock::MagicSock::spawn(msock_opts).await?;
512 trace!("created magicsock");
513
514 let server_config = static_config.create_server_config(initial_alpns)?;
515
516 let mut endpoint_config = quinn::EndpointConfig::default();
517 // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit
518 // set to 0. The fixed bit is the 3rd bit of the first byte of a packet.
519 // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight
520 // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore
521 // the packet if grease_quic_bit is set to false.
522 endpoint_config.grease_quic_bit(false);
523
524 let endpoint = quinn::Endpoint::new_with_abstract_socket(
525 endpoint_config,
526 Some(server_config),
527 Arc::new(msock.clone()),
528 Arc::new(quinn::TokioRuntime),
529 )?;
530 trace!("created quinn endpoint");
531 debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
532 Ok(Self {
533 msock,
534 endpoint,
535 rtt_actor: Arc::new(rtt_actor::RttHandle::new()),
536 cancel_token: CancellationToken::new(),
537 static_config: Arc::new(static_config),
538 })
539 }
540
541 /// Sets the list of accepted ALPN protocols.
542 ///
543 /// This will only affect new incoming connections.
544 /// Note that this *overrides* the current list of ALPNs.
545 pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) -> Result<()> {
546 let server_config = self.static_config.create_server_config(alpns)?;
547 self.endpoint.set_server_config(Some(server_config));
548 Ok(())
549 }
550
551 // # Methods for establishing connectivity.
552
553 /// Connects to a remote [`Endpoint`].
554 ///
555 /// A value that can be converted into a [`NodeAddr`] is required. This can be either a
556 /// [`NodeAddr`], a [`NodeId`] or a [`iroh_base::ticket::NodeTicket`].
557 ///
558 /// The [`NodeAddr`] must contain the [`NodeId`] to dial and may also contain a [`RelayUrl`]
559 /// and direct addresses. If direct addresses are provided, they will be used to try and
560 /// establish a direct connection without involving a relay server.
561 ///
562 /// If neither a [`RelayUrl`] or direct addresses are configured in the [`NodeAddr`] it
563 /// may still be possible a connection can be established. This depends on other calls
564 /// to [`Endpoint::add_node_addr`] which may provide contact information, or via the
565 /// [`Discovery`] service configured using [`Builder::discovery`]. The discovery
566 /// service will also be used if the remote node is not reachable on the provided direct
567 /// addresses and there is no [`RelayUrl`].
568 ///
569 /// If addresses or relay servers are neither provided nor can be discovered, the
570 /// connection attempt will fail with an error.
571 ///
572 /// The `alpn`, or application-level protocol identifier, is also required. The remote
573 /// endpoint must support this `alpn`, otherwise the connection attempt will fail with
574 /// an error.
575 #[instrument(skip_all, fields(me = %self.node_id().fmt_short(), alpn = ?String::from_utf8_lossy(alpn)))]
576 pub async fn connect(&self, node_addr: impl Into<NodeAddr>, alpn: &[u8]) -> Result<Connection> {
577 let node_addr = node_addr.into();
578 tracing::Span::current().record("remote", node_addr.node_id.fmt_short());
579 // Connecting to ourselves is not supported.
580 if node_addr.node_id == self.node_id() {
581 bail!(
582 "Connecting to ourself is not supported ({} is the node id of this node)",
583 node_addr.node_id.fmt_short()
584 );
585 }
586
587 if !node_addr.info.is_empty() {
588 self.add_node_addr(node_addr.clone())?;
589 }
590
591 let NodeAddr { node_id, info } = node_addr.clone();
592
593 // Get the mapped IPv6 address from the magic socket. Quinn will connect to this address.
594 // Start discovery for this node if it's enabled and we have no valid or verified
595 // address information for this node.
596 let (addr, discovery) = self
597 .get_mapping_addr_and_maybe_start_discovery(node_addr)
598 .await
599 .with_context(|| {
600 format!(
601 "No addressing information for NodeId({}), unable to connect",
602 node_id.fmt_short()
603 )
604 })?;
605
606 debug!(
607 "connecting to {}: (via {} - {:?})",
608 node_id, addr, info.direct_addresses
609 );
610
611 // Start connecting via quinn. This will time out after 10 seconds if no reachable address
612 // is available.
613 let conn = self.connect_quinn(node_id, alpn, addr).await;
614
615 // Cancel the node discovery task (if still running).
616 if let Some(discovery) = discovery {
617 discovery.cancel();
618 }
619
620 conn
621 }
622
623 /// Connects to a remote endpoint, using just the nodes's [`NodeId`].
624 ///
625 /// This is a convenience function for [`Endpoint::connect`]. It relies on addressing
626 /// information being provided by either the discovery service or using
627 /// [`Endpoint::add_node_addr`]. See [`Endpoint::connect`] for the details of how it
628 /// uses the discovery service to establish a connection to a remote node.
629 #[deprecated(
630 since = "0.27.0",
631 note = "Please use `connect` directly with a NodeId. This fn will be removed in 0.28.0."
632 )]
633 pub async fn connect_by_node_id(&self, node_id: NodeId, alpn: &[u8]) -> Result<Connection> {
634 let addr = NodeAddr::new(node_id);
635 self.connect(addr, alpn).await
636 }
637
638 #[instrument(
639 skip_all,
640 fields(remote_node = node_id.fmt_short(), alpn = %String::from_utf8_lossy(alpn))
641 )]
642 async fn connect_quinn(
643 &self,
644 node_id: NodeId,
645 alpn: &[u8],
646 addr: QuicMappedAddr,
647 ) -> Result<Connection> {
648 debug!("Attempting connection...");
649 let client_config = {
650 let alpn_protocols = vec![alpn.to_vec()];
651 let quic_client_config = tls::make_client_config(
652 &self.static_config.secret_key,
653 Some(node_id),
654 alpn_protocols,
655 self.static_config.keylog,
656 )?;
657 let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
658 let mut transport_config = quinn::TransportConfig::default();
659 transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
660 client_config.transport_config(Arc::new(transport_config));
661 client_config
662 };
663
664 // TODO: We'd eventually want to replace "localhost" with something that makes more sense.
665 let connect = self
666 .endpoint
667 .connect_with(client_config, addr.0, "localhost")?;
668
669 let connection = connect
670 .await
671 .context("failed connecting to remote endpoint")?;
672
673 let rtt_msg = RttMessage::NewConnection {
674 connection: connection.weak_handle(),
675 conn_type_changes: self.conn_type_stream(node_id)?,
676 node_id,
677 };
678 if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await {
679 // If this actor is dead, that's not great but we can still function.
680 warn!("rtt-actor not reachable: {err:#}");
681 }
682 debug!("Connection established");
683 Ok(connection)
684 }
685
686 /// Accepts an incoming connection on the endpoint.
687 ///
688 /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted.
689 /// If multiple ALPNs have been configured the ALPN can be inspected before accepting
690 /// the connection using [`Connecting::alpn`].
691 ///
692 /// The returned future will yield `None` if the endpoint is closed by calling
693 /// [`Endpoint::close`].
694 pub fn accept(&self) -> Accept<'_> {
695 Accept {
696 inner: self.endpoint.accept(),
697 ep: self.clone(),
698 }
699 }
700
701 // # Methods for manipulating the internal state about other nodes.
702
703 /// Informs this [`Endpoint`] about addresses of the iroh-net node.
704 ///
705 /// This updates the local state for the remote node. If the provided [`NodeAddr`]
706 /// contains a [`RelayUrl`] this will be used as the new relay server for this node. If
707 /// it contains any new IP endpoints they will also be stored and tried when next
708 /// connecting to this node. Any address that matches this node's direct addresses will be
709 /// silently ignored.
710 ///
711 /// See also [`Endpoint::add_node_addr_with_source`].
712 ///
713 /// # Errors
714 ///
715 /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
716 /// direct addresses are a subset of ours.
717 pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> {
718 self.add_node_addr_inner(node_addr, magicsock::Source::App)
719 }
720
721 /// Informs this [`Endpoint`] about addresses of the iroh-net node, noting the source.
722 ///
723 /// This updates the local state for the remote node. If the provided [`NodeAddr`] contains a
724 /// [`RelayUrl`] this will be used as the new relay server for this node. If it contains any
725 /// new IP endpoints they will also be stored and tried when next connecting to this node. Any
726 /// address that matches this node's direct addresses will be silently ignored. The *source* is
727 /// used for logging exclusively and will not be stored.
728 ///
729 /// # Errors
730 ///
731 /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
732 /// direct addresses are a subset of ours.
733 pub fn add_node_addr_with_source(
734 &self,
735 node_addr: NodeAddr,
736 source: &'static str,
737 ) -> Result<()> {
738 self.add_node_addr_inner(
739 node_addr,
740 magicsock::Source::NamedApp {
741 name: source.into(),
742 },
743 )
744 }
745
746 fn add_node_addr_inner(&self, node_addr: NodeAddr, source: magicsock::Source) -> Result<()> {
747 // Connecting to ourselves is not supported.
748 if node_addr.node_id == self.node_id() {
749 bail!(
750 "Adding our own address is not supported ({} is the node id of this node)",
751 node_addr.node_id.fmt_short()
752 );
753 }
754 self.msock.add_node_addr(node_addr, source)
755 }
756
757 // # Getter methods for properties of this Endpoint itself.
758
759 /// Returns the secret_key of this endpoint.
760 pub fn secret_key(&self) -> &SecretKey {
761 &self.static_config.secret_key
762 }
763
764 /// Returns the node id of this endpoint.
765 ///
766 /// This ID is the unique addressing information of this node and other peers must know
767 /// it to be able to connect to this node.
768 pub fn node_id(&self) -> NodeId {
769 self.static_config.secret_key.public()
770 }
771
772 /// Returns the current [`NodeAddr`] for this endpoint.
773 ///
774 /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and local IP endpoints
775 /// as they would be returned by [`Endpoint::home_relay`] and
776 /// [`Endpoint::direct_addresses`].
777 pub async fn node_addr(&self) -> Result<NodeAddr> {
778 let addrs = self
779 .direct_addresses()
780 .next()
781 .await
782 .ok_or(anyhow!("No IP endpoints found"))?;
783 let relay = self.home_relay();
784 Ok(NodeAddr::from_parts(
785 self.node_id(),
786 relay,
787 addrs.into_iter().map(|x| x.addr),
788 ))
789 }
790
791 /// Returns the [`RelayUrl`] of the Relay server used as home relay.
792 ///
793 /// Every endpoint has a home Relay server which it chooses as the server with the
794 /// lowest latency out of the configured servers provided by [`Builder::relay_mode`].
795 /// This is the server other iroh-net nodes can use to reliably establish a connection
796 /// to this node.
797 ///
798 /// Returns `None` if we are not connected to any Relay server.
799 ///
800 /// Note that this will be `None` right after the [`Endpoint`] is created since it takes
801 /// some time to connect to find and connect to the home relay server. Use
802 /// [`Endpoint::watch_home_relay`] to wait until the home relay server is available.
803 pub fn home_relay(&self) -> Option<RelayUrl> {
804 self.msock.my_relay()
805 }
806
807 /// Watches for changes to the home relay.
808 ///
809 /// If there is currently a home relay it will be yielded immediately as the first item
810 /// in the stream. This makes it possible to use this function to wait for the initial
811 /// home relay to be known.
812 ///
813 /// Note that it is not guaranteed that a home relay will ever become available. If no
814 /// servers are configured with [`Builder::relay_mode`] this stream will never yield an
815 /// item.
816 pub fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
817 self.msock.watch_home_relay()
818 }
819
820 /// Returns the direct addresses of this [`Endpoint`].
821 ///
822 /// The direct addresses of the [`Endpoint`] are those that could be used by other
823 /// iroh-net nodes to establish direct connectivity, depending on the network
824 /// situation. The yielded lists of direct addresses contain both the locally-bound
825 /// addresses and the [`Endpoint`]'s publicly reachable addresses discovered through
826 /// mechanisms such as [STUN] and port mapping. Hence usually only a subset of these
827 /// will be applicable to a certain remote iroh-net node.
828 ///
829 /// The [`Endpoint`] continuously monitors the direct addresses for changes as its own
830 /// location in the network might change. Whenever changes are detected this stream
831 /// will yield a new list of direct addresses.
832 ///
833 /// When issuing the first call to this method the first direct address discovery might
834 /// still be underway, in this case the first item of the returned stream will not be
835 /// immediately available. Once this first set of local IP endpoints are discovered the
836 /// stream will always return the first set of IP endpoints immediately, which are the
837 /// most recently discovered IP endpoints.
838 ///
839 /// # Examples
840 ///
841 /// To get the current endpoints, drop the stream after the first item was received:
842 /// ```
843 /// use futures_lite::StreamExt;
844 /// use iroh_net::Endpoint;
845 ///
846 /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
847 /// # rt.block_on(async move {
848 /// let mep = Endpoint::builder().bind().await.unwrap();
849 /// let _addrs = mep.direct_addresses().next().await;
850 /// # });
851 /// ```
852 ///
853 /// [STUN]: https://en.wikipedia.org/wiki/STUN
854 pub fn direct_addresses(&self) -> DirectAddrsStream {
855 self.msock.direct_addresses()
856 }
857
858 /// Returns the local socket addresses on which the underlying sockets are bound.
859 ///
860 /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
861 /// address if available.
862 pub fn bound_sockets(&self) -> (SocketAddr, Option<SocketAddr>) {
863 self.msock.local_addr()
864 }
865
866 // # Getter methods for information about other nodes.
867
868 /// Returns information about the remote node identified by a [`NodeId`].
869 ///
870 /// The [`Endpoint`] keeps some information about remote iroh-net nodes, which it uses to find
871 /// the best path to a node. Having information on a remote node, however, does not mean we have
872 /// ever connected to it to or even whether a connection is even possible. The information about a
873 /// remote node will change over time, as the [`Endpoint`] learns more about the node. Future
874 /// calls may return different information. Furthermore, node information may even be
875 /// completely evicted as it becomes stale.
876 ///
877 /// See also [`Endpoint::remote_info_iter`] which returns information on all nodes known
878 /// by this [`Endpoint`].
879 pub fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
880 self.msock.remote_info(node_id)
881 }
882
883 /// Returns information about all the remote nodes this [`Endpoint`] knows about.
884 ///
885 /// This returns the same information as [`Endpoint::remote_info`] for each node known to this
886 /// [`Endpoint`].
887 ///
888 /// The [`Endpoint`] keeps some information about remote iroh-net nodes, which it uses to find
889 /// the best path to a node. This returns all the nodes it knows about, regardless of whether a
890 /// connection was ever made or is even possible.
891 ///
892 /// See also [`Endpoint::remote_info`] to only retrieve information about a single node.
893 pub fn remote_info_iter(&self) -> impl Iterator<Item = RemoteInfo> {
894 self.msock.list_remote_infos().into_iter()
895 }
896
897 // # Methods for less common getters.
898 //
899 // Partially they return things passed into the builder.
900
901 /// Returns a stream that reports connection type changes for the remote node.
902 ///
903 /// This returns a stream of [`ConnectionType`] items, each time the underlying
904 /// connection to a remote node changes it yields an item. These connection changes are
905 /// when the connection switches between using the Relay server and a direct connection.
906 ///
907 /// If there is currently a connection with the remote node the first item in the stream
908 /// will yield immediately returning the current connection type.
909 ///
910 /// Note that this does not guarantee each connection change is yielded in the stream.
911 /// If the connection type changes several times before this stream is polled only the
912 /// last recorded state is returned. This can be observed e.g. right at the start of a
913 /// connection when the switch from a relayed to a direct connection can be so fast that
914 /// the relayed state is never exposed.
915 ///
916 /// # Errors
917 ///
918 /// Will error if we do not have any address information for the given `node_id`.
919 pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
920 self.msock.conn_type_stream(node_id)
921 }
922
923 /// Returns the DNS resolver used in this [`Endpoint`].
924 ///
925 /// See [`Builder::discovery`].
926 pub fn dns_resolver(&self) -> &DnsResolver {
927 self.msock.dns_resolver()
928 }
929
930 /// Returns the discovery mechanism, if configured.
931 ///
932 /// See [`Builder::dns_resolver`].
933 pub fn discovery(&self) -> Option<&dyn Discovery> {
934 self.msock.discovery()
935 }
936
937 // # Methods for less common state updates.
938
939 /// Notifies the system of potential network changes.
940 ///
941 /// On many systems iroh is able to detect network changes by itself, however
942 /// some systems like android do not expose this functionality to native code.
943 /// Android does however provide this functionality to Java code. This
944 /// function allows for notifying iroh of any potential network changes like
945 /// this.
946 ///
947 /// Even when the network did not change, or iroh was already able to detect
948 /// the network change itself, there is no harm in calling this function.
949 pub async fn network_change(&self) {
950 self.msock.network_change().await;
951 }
952
953 // # Methods for terminating the endpoint.
954
955 /// Closes the QUIC endpoint and the magic socket.
956 ///
957 /// This will close all open QUIC connections with the provided error_code and
958 /// reason. See [`quinn::Connection`] for details on how these are interpreted.
959 ///
960 /// It will then wait for all connections to actually be shutdown, and afterwards
961 /// close the magic socket.
962 ///
963 /// Returns an error if closing the magic socket failed.
964 /// TODO: Document error cases.
965 pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> {
966 let Endpoint {
967 msock,
968 endpoint,
969 cancel_token,
970 ..
971 } = self;
972 cancel_token.cancel();
973 tracing::debug!("Closing connections");
974 endpoint.close(error_code, reason);
975 endpoint.wait_idle().await;
976 // In case this is the last clone of `Endpoint`, dropping the `quinn::Endpoint` will
977 // make it more likely that the underlying socket is not polled by quinn anymore after this
978 drop(endpoint);
979 tracing::debug!("Connections closed");
980
981 msock.close().await?;
982 Ok(())
983 }
984
985 // # Remaining private methods
986
987 pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> {
988 self.cancel_token.cancelled()
989 }
990
991 /// Return the quic mapped address for this `node_id` and possibly start discovery
992 /// services if discovery is enabled on this magic endpoint.
993 ///
994 /// This will launch discovery in all cases except if:
995 /// 1) we do not have discovery enabled
996 /// 2) we have discovery enabled, but already have at least one verified, unexpired
997 /// addresses for this `node_id`
998 ///
999 /// # Errors
1000 ///
1001 /// This method may fail if we have no way of dialing the node. This can occur if
1002 /// we were given no dialing information in the [`NodeAddr`] and no discovery
1003 /// services were configured or if discovery failed to fetch any dialing information.
1004 async fn get_mapping_addr_and_maybe_start_discovery(
1005 &self,
1006 node_addr: NodeAddr,
1007 ) -> Result<(QuicMappedAddr, Option<DiscoveryTask>)> {
1008 let node_id = node_addr.node_id;
1009
1010 // Only return a mapped addr if we have some way of dialing this node, in other
1011 // words, we have either a relay URL or at least one direct address.
1012 let addr = if self.msock.has_send_address(node_id) {
1013 self.msock.get_mapping_addr(node_id)
1014 } else {
1015 None
1016 };
1017 match addr {
1018 Some(addr) => {
1019 // We have some way of dialing this node, but that doesn't actually mean
1020 // we can actually connect to any of these addresses.
1021 // Therefore, we will invoke the discovery service if we haven't received from the
1022 // endpoint on any of the existing paths recently.
1023 // If the user provided addresses in this connect call, we will add a delay
1024 // followed by a recheck before starting the discovery, to give the magicsocket a
1025 // chance to test the newly provided addresses.
1026 let delay = (!node_addr.info.is_empty()).then_some(DISCOVERY_WAIT_PERIOD);
1027 let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
1028 .ok()
1029 .flatten();
1030 Ok((addr, discovery))
1031 }
1032
1033 None => {
1034 // We have no known addresses or relay URLs for this node.
1035 // So, we start a discovery task and wait for the first result to arrive, and
1036 // only then continue, because otherwise we wouldn't have any
1037 // path to the remote endpoint.
1038 let mut discovery = DiscoveryTask::start(self.clone(), node_id)
1039 .context("Discovery service required due to missing addressing information")?;
1040 discovery
1041 .first_arrived()
1042 .await
1043 .context("Discovery service failed")?;
1044 if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1045 Ok((addr, Some(discovery)))
1046 } else {
1047 bail!("Discovery did not find addressing information");
1048 }
1049 }
1050 }
1051 }
1052
1053 #[cfg(test)]
1054 pub(crate) fn magic_sock(&self) -> Handle {
1055 self.msock.clone()
1056 }
1057 #[cfg(test)]
1058 pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1059 &self.endpoint
1060 }
1061}
1062
1063/// Future produced by [`Endpoint::accept`].
1064#[derive(Debug)]
1065#[pin_project]
1066pub struct Accept<'a> {
1067 #[pin]
1068 #[debug("quinn::Accept")]
1069 inner: quinn::Accept<'a>,
1070 ep: Endpoint,
1071}
1072
1073impl<'a> Future for Accept<'a> {
1074 type Output = Option<Incoming>;
1075
1076 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1077 let this = self.project();
1078 match this.inner.poll(cx) {
1079 Poll::Pending => Poll::Pending,
1080 Poll::Ready(None) => Poll::Ready(None),
1081 Poll::Ready(Some(inner)) => Poll::Ready(Some(Incoming {
1082 inner,
1083 ep: this.ep.clone(),
1084 })),
1085 }
1086 }
1087}
1088
1089/// An incoming connection for which the server has not yet begun its parts of the
1090/// handshake.
1091#[derive(Debug)]
1092pub struct Incoming {
1093 inner: quinn::Incoming,
1094 ep: Endpoint,
1095}
1096
1097impl Incoming {
1098 /// Attempts to accept this incoming connection (an error may still occur).
1099 ///
1100 /// Errors occurring here are likely not caused by the application or remote. The QUIC
1101 /// connection listens on a normal UDP socket and any reachable network endpoint can
1102 /// send datagrams to it, solicited or not. Even if the first few bytes look like a
1103 /// QUIC packet, it might not even be a QUIC packet that is being received.
1104 ///
1105 /// Thus it is common to simply log the errors here and accept them as something which
1106 /// can happen.
1107 pub fn accept(self) -> Result<Connecting, ConnectionError> {
1108 self.inner.accept().map(|conn| Connecting {
1109 inner: conn,
1110 ep: self.ep,
1111 })
1112 }
1113
1114 /// Accepts this incoming connection using a custom configuration.
1115 ///
1116 /// See [`accept()`] for more details.
1117 ///
1118 /// [`accept()`]: Incoming::accept
1119 pub fn accept_with(
1120 self,
1121 server_config: Arc<ServerConfig>,
1122 ) -> Result<Connecting, ConnectionError> {
1123 self.inner
1124 .accept_with(server_config)
1125 .map(|conn| Connecting {
1126 inner: conn,
1127 ep: self.ep,
1128 })
1129 }
1130
1131 /// Rejects this incoming connection attempt.
1132 pub fn refuse(self) {
1133 self.inner.refuse()
1134 }
1135
1136 /// Responds with a retry packet.
1137 ///
1138 /// This requires the client to retry with address validation.
1139 ///
1140 /// Errors if `remote_address_validated()` is true.
1141 pub fn retry(self) -> Result<(), RetryError> {
1142 self.inner.retry()
1143 }
1144
1145 /// Ignores this incoming connection attempt, not sending any packet in response.
1146 pub fn ignore(self) {
1147 self.inner.ignore()
1148 }
1149
1150 /// Returns the local IP address which was used when the peer established the
1151 /// connection.
1152 pub fn local_ip(&self) -> Option<IpAddr> {
1153 self.inner.local_ip()
1154 }
1155
1156 /// Returns the peer's UDP address.
1157 pub fn remote_address(&self) -> SocketAddr {
1158 self.inner.remote_address()
1159 }
1160
1161 /// Whether the socket address that is initiating this connection has been validated.
1162 ///
1163 /// This means that the sender of the initial packet has proved that they can receive
1164 /// traffic sent to `self.remote_address()`.
1165 pub fn remote_address_validated(&self) -> bool {
1166 self.inner.remote_address_validated()
1167 }
1168}
1169
1170impl IntoFuture for Incoming {
1171 type Output = Result<Connection, ConnectionError>;
1172 type IntoFuture = IncomingFuture;
1173
1174 fn into_future(self) -> Self::IntoFuture {
1175 IncomingFuture {
1176 inner: self.inner.into_future(),
1177 ep: self.ep,
1178 }
1179 }
1180}
1181
1182/// Adaptor to let [`Incoming`] be `await`ed like a [`Connecting`].
1183#[derive(Debug)]
1184#[pin_project]
1185pub struct IncomingFuture {
1186 #[pin]
1187 inner: quinn::IncomingFuture,
1188 ep: Endpoint,
1189}
1190
1191impl Future for IncomingFuture {
1192 type Output = Result<quinn::Connection, ConnectionError>;
1193
1194 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1195 let this = self.project();
1196 match this.inner.poll(cx) {
1197 Poll::Pending => Poll::Pending,
1198 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1199 Poll::Ready(Ok(conn)) => {
1200 try_send_rtt_msg(&conn, this.ep);
1201 Poll::Ready(Ok(conn))
1202 }
1203 }
1204 }
1205}
1206
1207/// In-progress connection attempt future
1208#[derive(Debug)]
1209#[pin_project]
1210pub struct Connecting {
1211 #[pin]
1212 inner: quinn::Connecting,
1213 ep: Endpoint,
1214}
1215
1216impl Connecting {
1217 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security.
1218 pub fn into_0rtt(self) -> Result<(Connection, ZeroRttAccepted), Self> {
1219 match self.inner.into_0rtt() {
1220 Ok((conn, zrtt_accepted)) => {
1221 try_send_rtt_msg(&conn, &self.ep);
1222 Ok((conn, zrtt_accepted))
1223 }
1224 Err(inner) => Err(Self { inner, ep: self.ep }),
1225 }
1226 }
1227
1228 /// Parameters negotiated during the handshake
1229 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
1230 self.inner.handshake_data().await
1231 }
1232
1233 /// The local IP address which was used when the peer established the connection.
1234 pub fn local_ip(&self) -> Option<IpAddr> {
1235 self.inner.local_ip()
1236 }
1237
1238 /// The peer's UDP address.
1239 pub fn remote_address(&self) -> SocketAddr {
1240 self.inner.remote_address()
1241 }
1242
1243 /// Extracts the ALPN protocol from the peer's handshake data.
1244 // Note, we could totally provide this method to be on a Connection as well. But we'd
1245 // need to wrap Connection too.
1246 pub async fn alpn(&mut self) -> Result<Vec<u8>> {
1247 let data = self.handshake_data().await?;
1248 match data.downcast::<quinn::crypto::rustls::HandshakeData>() {
1249 Ok(data) => match data.protocol {
1250 Some(protocol) => Ok(protocol),
1251 None => bail!("no ALPN protocol available"),
1252 },
1253 Err(_) => bail!("unknown handshake type"),
1254 }
1255 }
1256}
1257
1258impl Future for Connecting {
1259 type Output = Result<Connection, ConnectionError>;
1260
1261 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1262 let this = self.project();
1263 match this.inner.poll(cx) {
1264 Poll::Pending => Poll::Pending,
1265 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1266 Poll::Ready(Ok(conn)) => {
1267 try_send_rtt_msg(&conn, this.ep);
1268 Poll::Ready(Ok(conn))
1269 }
1270 }
1271 }
1272}
1273
1274/// Extract the [`PublicKey`] from the peer's TLS certificate.
1275// TODO: make this a method now
1276pub fn get_remote_node_id(connection: &Connection) -> Result<PublicKey> {
1277 let data = connection.peer_identity();
1278 match data {
1279 None => bail!("no peer certificate found"),
1280 Some(data) => match data.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
1281 Ok(certs) => {
1282 if certs.len() != 1 {
1283 bail!(
1284 "expected a single peer certificate, but {} found",
1285 certs.len()
1286 );
1287 }
1288 let cert = tls::certificate::parse(&certs[0])?;
1289 Ok(cert.peer_id())
1290 }
1291 Err(_) => bail!("invalid peer certificate"),
1292 },
1293 }
1294}
1295
1296/// Try send a message to the rtt-actor.
1297///
1298/// If we can't notify the actor that will impact performance a little, but we can still
1299/// function.
1300fn try_send_rtt_msg(conn: &Connection, magic_ep: &Endpoint) {
1301 // If we can't notify the rtt-actor that's not great but not critical.
1302 let Ok(peer_id) = get_remote_node_id(conn) else {
1303 warn!(?conn, "failed to get remote node id");
1304 return;
1305 };
1306 let Ok(conn_type_changes) = magic_ep.conn_type_stream(peer_id) else {
1307 warn!(?conn, "failed to create conn_type_stream");
1308 return;
1309 };
1310 let rtt_msg = RttMessage::NewConnection {
1311 connection: conn.weak_handle(),
1312 conn_type_changes,
1313 node_id: peer_id,
1314 };
1315 if let Err(err) = magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) {
1316 warn!(?conn, "rtt-actor not reachable: {err:#}");
1317 }
1318}
1319
1320/// Read a proxy url from the environment, in this order
1321///
1322/// - `HTTP_PROXY`
1323/// - `http_proxy`
1324/// - `HTTPS_PROXY`
1325/// - `https_proxy`
1326fn proxy_url_from_env() -> Option<Url> {
1327 if let Some(url) = std::env::var("HTTP_PROXY")
1328 .ok()
1329 .and_then(|s| s.parse::<Url>().ok())
1330 {
1331 if is_cgi() {
1332 warn!("HTTP_PROXY environment variable ignored in CGI");
1333 } else {
1334 return Some(url);
1335 }
1336 }
1337 if let Some(url) = std::env::var("http_proxy")
1338 .ok()
1339 .and_then(|s| s.parse::<Url>().ok())
1340 {
1341 return Some(url);
1342 }
1343 if let Some(url) = std::env::var("HTTPS_PROXY")
1344 .ok()
1345 .and_then(|s| s.parse::<Url>().ok())
1346 {
1347 return Some(url);
1348 }
1349 if let Some(url) = std::env::var("https_proxy")
1350 .ok()
1351 .and_then(|s| s.parse::<Url>().ok())
1352 {
1353 return Some(url);
1354 }
1355
1356 None
1357}
1358
1359/// Returns the default relay mode.
1360///
1361/// If the `IROH_FORCE_STAGING_RELAYS` environment variable is non empty, it will return `RelayMode::Staging`.
1362/// Otherwise, it will return `RelayMode::Default`.
1363pub fn default_relay_mode() -> RelayMode {
1364 // Use staging in testing
1365 match force_staging_infra() {
1366 true => RelayMode::Staging,
1367 false => RelayMode::Default,
1368 }
1369}
1370
1371/// Check if we are being executed in a CGI context.
1372///
1373/// If so, a malicious client can send the `Proxy:` header, and it will
1374/// be in the `HTTP_PROXY` env var. So we don't use it :)
1375fn is_cgi() -> bool {
1376 std::env::var_os("REQUEST_METHOD").is_some()
1377}
1378
1379// TODO: These tests could still be flaky, lets fix that:
1380// https://github.com/n0-computer/iroh/issues/1183
1381#[cfg(test)]
1382mod tests {
1383
1384 use std::time::Instant;
1385
1386 use iroh_test::CallOnDrop;
1387 use rand::SeedableRng;
1388 use tracing::{error_span, info, info_span, Instrument};
1389
1390 use super::*;
1391 use crate::test_utils::{run_relay_server, run_relay_server_with};
1392
1393 const TEST_ALPN: &[u8] = b"n0/iroh/test";
1394
1395 #[test]
1396 fn test_addr_info_debug() {
1397 let info = AddrInfo {
1398 relay_url: Some("https://relay.example.com".parse().unwrap()),
1399 direct_addresses: vec![SocketAddr::from(([1, 2, 3, 4], 1234))]
1400 .into_iter()
1401 .collect(),
1402 };
1403 assert_eq!(
1404 format!("{:?}", info),
1405 r#"AddrInfo { relay_url: Some(RelayUrl("https://relay.example.com./")), direct_addresses: {1.2.3.4:1234} }"#
1406 );
1407 }
1408
1409 #[tokio::test]
1410 async fn test_connect_self() {
1411 let _guard = iroh_test::logging::setup();
1412 let ep = Endpoint::builder()
1413 .alpns(vec![TEST_ALPN.to_vec()])
1414 .bind()
1415 .await
1416 .unwrap();
1417 let my_addr = ep.node_addr().await.unwrap();
1418 let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1419 assert!(res.is_err());
1420 let err = res.err().unwrap();
1421 assert!(err.to_string().starts_with("Connecting to ourself"));
1422
1423 let res = ep.add_node_addr(my_addr);
1424 assert!(res.is_err());
1425 let err = res.err().unwrap();
1426 assert!(err.to_string().starts_with("Adding our own address"));
1427 }
1428
1429 #[tokio::test]
1430 async fn endpoint_connect_close() {
1431 let _guard = iroh_test::logging::setup();
1432 let (relay_map, relay_url, _guard) = run_relay_server().await.unwrap();
1433 let server_secret_key = SecretKey::generate();
1434 let server_peer_id = server_secret_key.public();
1435
1436 let server = {
1437 let relay_map = relay_map.clone();
1438 tokio::spawn(
1439 async move {
1440 let ep = Endpoint::builder()
1441 .secret_key(server_secret_key)
1442 .alpns(vec![TEST_ALPN.to_vec()])
1443 .relay_mode(RelayMode::Custom(relay_map))
1444 .insecure_skip_relay_cert_verify(true)
1445 .bind()
1446 .await
1447 .unwrap();
1448 info!("accepting connection");
1449 let incoming = ep.accept().await.unwrap();
1450 let conn = incoming.await.unwrap();
1451 let mut stream = conn.accept_uni().await.unwrap();
1452 let mut buf = [0u8; 5];
1453 stream.read_exact(&mut buf).await.unwrap();
1454 info!("Accepted 1 stream, received {buf:?}. Closing now.");
1455 // close the connection
1456 conn.close(7u8.into(), b"bye");
1457
1458 let res = conn.accept_uni().await;
1459 assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
1460
1461 let res = stream.read_to_end(10).await;
1462 assert_eq!(
1463 res.unwrap_err(),
1464 quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1465 quinn::ConnectionError::LocallyClosed
1466 ))
1467 );
1468 info!("server test completed");
1469 }
1470 .instrument(info_span!("test-server")),
1471 )
1472 };
1473
1474 let client = tokio::spawn(
1475 async move {
1476 let ep = Endpoint::builder()
1477 .alpns(vec![TEST_ALPN.to_vec()])
1478 .relay_mode(RelayMode::Custom(relay_map))
1479 .insecure_skip_relay_cert_verify(true)
1480 .bind()
1481 .await
1482 .unwrap();
1483 info!("client connecting");
1484 let node_addr = NodeAddr::new(server_peer_id).with_relay_url(relay_url);
1485 let conn = ep.connect(node_addr, TEST_ALPN).await.unwrap();
1486 let mut stream = conn.open_uni().await.unwrap();
1487
1488 // First write is accepted by server. We need this bit of synchronisation
1489 // because if the server closes after simply accepting the connection we can
1490 // not be sure our .open_uni() call would succeed as it may already receive
1491 // the error.
1492 stream.write_all(b"hello").await.unwrap();
1493
1494 info!("waiting for closed");
1495 // Remote now closes the connection, we should see an error sometime soon.
1496 let err = conn.closed().await;
1497 let expected_err =
1498 quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose {
1499 error_code: 7u8.into(),
1500 reason: b"bye".to_vec().into(),
1501 });
1502 assert_eq!(err, expected_err);
1503
1504 info!("opening new - expect it to fail");
1505 let res = conn.open_uni().await;
1506 assert_eq!(res.unwrap_err(), expected_err);
1507 info!("client test completed");
1508 }
1509 .instrument(info_span!("test-client")),
1510 );
1511
1512 let (server, client) = tokio::time::timeout(
1513 Duration::from_secs(30),
1514 futures_lite::future::zip(server, client),
1515 )
1516 .await
1517 .expect("timeout");
1518 server.unwrap();
1519 client.unwrap();
1520 }
1521
1522 /// Test that peers are properly restored
1523 #[tokio::test]
1524 async fn restore_peers() {
1525 let _guard = iroh_test::logging::setup();
1526
1527 let secret_key = SecretKey::generate();
1528
1529 /// Create an endpoint for the test.
1530 async fn new_endpoint(secret_key: SecretKey, nodes: Option<Vec<NodeAddr>>) -> Endpoint {
1531 let mut transport_config = quinn::TransportConfig::default();
1532 transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
1533
1534 let mut builder = Endpoint::builder()
1535 .secret_key(secret_key.clone())
1536 .transport_config(transport_config);
1537 if let Some(nodes) = nodes {
1538 builder = builder.known_nodes(nodes);
1539 }
1540 builder
1541 .alpns(vec![TEST_ALPN.to_vec()])
1542 .bind()
1543 .await
1544 .unwrap()
1545 }
1546
1547 // create the peer that will be added to the peer map
1548 let peer_id = SecretKey::generate().public();
1549 let direct_addr: SocketAddr =
1550 (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 8758u16).into();
1551 let node_addr = NodeAddr::new(peer_id).with_direct_addresses([direct_addr]);
1552
1553 info!("setting up first endpoint");
1554 // first time, create a magic endpoint without peers but a peers file and add addressing
1555 // information for a peer
1556 let endpoint = new_endpoint(secret_key.clone(), None).await;
1557 assert_eq!(endpoint.remote_info_iter().count(), 0);
1558 endpoint.add_node_addr(node_addr.clone()).unwrap();
1559
1560 // Grab the current addrs
1561 let node_addrs: Vec<NodeAddr> = endpoint.remote_info_iter().map(Into::into).collect();
1562 assert_eq!(node_addrs.len(), 1);
1563 assert_eq!(node_addrs[0], node_addr);
1564
1565 info!("closing endpoint");
1566 // close the endpoint and restart it
1567 endpoint.close(0u32.into(), b"done").await.unwrap();
1568
1569 info!("restarting endpoint");
1570 // now restart it and check the addressing info of the peer
1571 let endpoint = new_endpoint(secret_key, Some(node_addrs)).await;
1572 let RemoteInfo { mut addrs, .. } = endpoint.remote_info(peer_id).unwrap();
1573 let conn_addr = addrs.pop().unwrap().addr;
1574 assert_eq!(conn_addr, direct_addr);
1575 }
1576
1577 #[tokio::test]
1578 async fn endpoint_relay_connect_loop() {
1579 let _logging_guard = iroh_test::logging::setup();
1580 let start = Instant::now();
1581 let n_clients = 5;
1582 let n_chunks_per_client = 2;
1583 let chunk_size = 10;
1584 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1585 let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1586 let server_secret_key = SecretKey::generate_with_rng(&mut rng);
1587 let server_node_id = server_secret_key.public();
1588
1589 // The server accepts the connections of the clients sequentially.
1590 let server = {
1591 let relay_map = relay_map.clone();
1592 tokio::spawn(
1593 async move {
1594 let ep = Endpoint::builder()
1595 .insecure_skip_relay_cert_verify(true)
1596 .secret_key(server_secret_key)
1597 .alpns(vec![TEST_ALPN.to_vec()])
1598 .relay_mode(RelayMode::Custom(relay_map))
1599 .bind()
1600 .await
1601 .unwrap();
1602 let eps = ep.bound_sockets();
1603 info!(me = %ep.node_id().fmt_short(), ipv4=%eps.0, ipv6=?eps.1, "server bound");
1604 for i in 0..n_clients {
1605 let now = Instant::now();
1606 println!("[server] round {}", i + 1);
1607 let incoming = ep.accept().await.unwrap();
1608 let conn = incoming.await.unwrap();
1609 let peer_id = get_remote_node_id(&conn).unwrap();
1610 info!(%i, peer = %peer_id.fmt_short(), "accepted connection");
1611 let (mut send, mut recv) = conn.accept_bi().await.unwrap();
1612 let mut buf = vec![0u8; chunk_size];
1613 for _i in 0..n_chunks_per_client {
1614 recv.read_exact(&mut buf).await.unwrap();
1615 send.write_all(&buf).await.unwrap();
1616 }
1617 send.finish().unwrap();
1618 send.stopped().await.unwrap();
1619 recv.read_to_end(0).await.unwrap();
1620 info!(%i, peer = %peer_id.fmt_short(), "finished");
1621 println!("[server] round {} done in {:?}", i + 1, now.elapsed());
1622 }
1623 }
1624 .instrument(error_span!("server")),
1625 )
1626 };
1627 let abort_handle = server.abort_handle();
1628 let _server_guard = CallOnDrop::new(move || {
1629 abort_handle.abort();
1630 });
1631
1632 for i in 0..n_clients {
1633 let now = Instant::now();
1634 println!("[client] round {}", i + 1);
1635 let relay_map = relay_map.clone();
1636 let client_secret_key = SecretKey::generate_with_rng(&mut rng);
1637 let relay_url = relay_url.clone();
1638 async {
1639 info!("client binding");
1640 let ep = Endpoint::builder()
1641 .alpns(vec![TEST_ALPN.to_vec()])
1642 .insecure_skip_relay_cert_verify(true)
1643 .relay_mode(RelayMode::Custom(relay_map))
1644 .secret_key(client_secret_key)
1645 .bind()
1646 .await
1647 .unwrap();
1648 let eps = ep.bound_sockets();
1649 info!(me = %ep.node_id().fmt_short(), ipv4=%eps.0, ipv6=?eps.1, "client bound");
1650 let node_addr = NodeAddr::new(server_node_id).with_relay_url(relay_url);
1651 info!(to = ?node_addr, "client connecting");
1652 let conn = ep.connect(node_addr, TEST_ALPN).await.unwrap();
1653 info!("client connected");
1654 let (mut send, mut recv) = conn.open_bi().await.unwrap();
1655
1656 for i in 0..n_chunks_per_client {
1657 let mut buf = vec![i; chunk_size];
1658 send.write_all(&buf).await.unwrap();
1659 recv.read_exact(&mut buf).await.unwrap();
1660 assert_eq!(buf, vec![i; chunk_size]);
1661 }
1662 send.finish().unwrap();
1663 send.stopped().await.unwrap();
1664 recv.read_to_end(0).await.unwrap();
1665 info!("client finished");
1666 ep.close(0u32.into(), &[]).await.unwrap();
1667 info!("client closed");
1668 }
1669 .instrument(error_span!("client", %i))
1670 .await;
1671 println!("[client] round {} done in {:?}", i + 1, now.elapsed());
1672 }
1673
1674 server.await.unwrap();
1675
1676 // We appear to have seen this being very slow at times. So ensure we fail if this
1677 // test is too slow. We're only making two connections transferring very little
1678 // data, this really shouldn't take long.
1679 if start.elapsed() > Duration::from_secs(15) {
1680 panic!("Test too slow, something went wrong");
1681 }
1682 }
1683
1684 #[tokio::test]
1685 async fn endpoint_bidi_send_recv() {
1686 let _logging_guard = iroh_test::logging::setup();
1687 let ep1 = Endpoint::builder()
1688 .alpns(vec![TEST_ALPN.to_vec()])
1689 .relay_mode(RelayMode::Disabled)
1690 .bind()
1691 .await
1692 .unwrap();
1693 let ep2 = Endpoint::builder()
1694 .alpns(vec![TEST_ALPN.to_vec()])
1695 .relay_mode(RelayMode::Disabled)
1696 .bind()
1697 .await
1698 .unwrap();
1699 let ep1_nodeaddr = ep1.node_addr().await.unwrap();
1700 let ep2_nodeaddr = ep2.node_addr().await.unwrap();
1701 ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap();
1702 ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap();
1703 let ep1_nodeid = ep1.node_id();
1704 let ep2_nodeid = ep2.node_id();
1705 eprintln!("node id 1 {ep1_nodeid}");
1706 eprintln!("node id 2 {ep2_nodeid}");
1707
1708 async fn connect_hello(ep: Endpoint, dst: NodeAddr) {
1709 let conn = ep.connect(dst, TEST_ALPN).await.unwrap();
1710 let (mut send, mut recv) = conn.open_bi().await.unwrap();
1711 info!("sending hello");
1712 send.write_all(b"hello").await.unwrap();
1713 send.finish().unwrap();
1714 info!("receiving world");
1715 let m = recv.read_to_end(100).await.unwrap();
1716 assert_eq!(m, b"world");
1717 conn.close(1u8.into(), b"done");
1718 }
1719
1720 async fn accept_world(ep: Endpoint, src: NodeId) {
1721 let incoming = ep.accept().await.unwrap();
1722 let mut iconn = incoming.accept().unwrap();
1723 let alpn = iconn.alpn().await.unwrap();
1724 let conn = iconn.await.unwrap();
1725 let node_id = get_remote_node_id(&conn).unwrap();
1726 assert_eq!(node_id, src);
1727 assert_eq!(alpn, TEST_ALPN);
1728 let (mut send, mut recv) = conn.accept_bi().await.unwrap();
1729 info!("receiving hello");
1730 let m = recv.read_to_end(100).await.unwrap();
1731 assert_eq!(m, b"hello");
1732 info!("sending hello");
1733 send.write_all(b"world").await.unwrap();
1734 send.finish().unwrap();
1735 match conn.closed().await {
1736 ConnectionError::ApplicationClosed(closed) => {
1737 assert_eq!(closed.error_code, 1u8.into());
1738 }
1739 _ => panic!("wrong close error"),
1740 }
1741 }
1742
1743 let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_nodeid).instrument(info_span!(
1744 "p1_accept",
1745 ep1 = %ep1.node_id().fmt_short(),
1746 dst = %ep2_nodeid.fmt_short(),
1747 )));
1748 let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_nodeid).instrument(info_span!(
1749 "p2_accept",
1750 ep2 = %ep2.node_id().fmt_short(),
1751 dst = %ep1_nodeid.fmt_short(),
1752 )));
1753 let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_nodeaddr).instrument(
1754 info_span!(
1755 "p1_connect",
1756 ep1 = %ep1.node_id().fmt_short(),
1757 dst = %ep2_nodeid.fmt_short(),
1758 ),
1759 ));
1760 let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_nodeaddr).instrument(
1761 info_span!(
1762 "p2_connect",
1763 ep2 = %ep2.node_id().fmt_short(),
1764 dst = %ep1_nodeid.fmt_short(),
1765 ),
1766 ));
1767
1768 p1_accept.await.unwrap();
1769 p2_accept.await.unwrap();
1770 p1_connect.await.unwrap();
1771 p2_connect.await.unwrap();
1772 }
1773
1774 #[tokio::test]
1775 async fn endpoint_conn_type_stream() {
1776 const TIMEOUT: Duration = std::time::Duration::from_secs(15);
1777 let _logging_guard = iroh_test::logging::setup();
1778 let (relay_map, _relay_url, _relay_guard) = run_relay_server().await.unwrap();
1779 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1780 let ep1_secret_key = SecretKey::generate_with_rng(&mut rng);
1781 let ep2_secret_key = SecretKey::generate_with_rng(&mut rng);
1782 let ep1 = Endpoint::builder()
1783 .secret_key(ep1_secret_key)
1784 .insecure_skip_relay_cert_verify(true)
1785 .alpns(vec![TEST_ALPN.to_vec()])
1786 .relay_mode(RelayMode::Custom(relay_map.clone()))
1787 .bind()
1788 .await
1789 .unwrap();
1790 let ep2 = Endpoint::builder()
1791 .secret_key(ep2_secret_key)
1792 .insecure_skip_relay_cert_verify(true)
1793 .alpns(vec![TEST_ALPN.to_vec()])
1794 .relay_mode(RelayMode::Custom(relay_map))
1795 .bind()
1796 .await
1797 .unwrap();
1798
1799 async fn handle_direct_conn(ep: &Endpoint, node_id: PublicKey) -> Result<()> {
1800 let mut stream = ep.conn_type_stream(node_id)?;
1801 let src = ep.node_id().fmt_short();
1802 let dst = node_id.fmt_short();
1803 while let Some(conn_type) = stream.next().await {
1804 tracing::info!(me = %src, dst = %dst, conn_type = ?conn_type);
1805 if matches!(conn_type, ConnectionType::Direct(_)) {
1806 return Ok(());
1807 }
1808 }
1809 anyhow::bail!("conn_type stream ended before `ConnectionType::Direct`");
1810 }
1811
1812 async fn accept(ep: &Endpoint) -> NodeId {
1813 let incoming = ep.accept().await.unwrap();
1814 let conn = incoming.await.unwrap();
1815 let node_id = get_remote_node_id(&conn).unwrap();
1816 tracing::info!(node_id=%node_id.fmt_short(), "accepted connection");
1817 node_id
1818 }
1819
1820 let ep1_nodeid = ep1.node_id();
1821 let ep2_nodeid = ep2.node_id();
1822
1823 let ep1_nodeaddr = ep1.node_addr().await.unwrap();
1824 tracing::info!(
1825 "node id 1 {ep1_nodeid}, relay URL {:?}",
1826 ep1_nodeaddr.relay_url()
1827 );
1828 tracing::info!("node id 2 {ep2_nodeid}");
1829
1830 let ep1_side = async move {
1831 accept(&ep1).await;
1832 handle_direct_conn(&ep1, ep2_nodeid).await
1833 };
1834
1835 let ep2_side = async move {
1836 ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap();
1837 handle_direct_conn(&ep2, ep1_nodeid).await
1838 };
1839
1840 let res_ep1 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep1_side));
1841
1842 let ep1_abort_handle = res_ep1.abort_handle();
1843 let _ep1_guard = CallOnDrop::new(move || {
1844 ep1_abort_handle.abort();
1845 });
1846
1847 let res_ep2 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep2_side));
1848 let ep2_abort_handle = res_ep2.abort_handle();
1849 let _ep2_guard = CallOnDrop::new(move || {
1850 ep2_abort_handle.abort();
1851 });
1852
1853 let (r1, r2) = tokio::try_join!(res_ep1, res_ep2).unwrap();
1854 r1.expect("ep1 timeout").unwrap();
1855 r2.expect("ep2 timeout").unwrap();
1856 }
1857
1858 #[tokio::test]
1859 async fn test_direct_addresses_no_stun_relay() {
1860 let _guard = iroh_test::logging::setup();
1861 let (relay_map, _, _guard) = run_relay_server_with(None).await.unwrap();
1862
1863 let ep = Endpoint::builder()
1864 .alpns(vec![TEST_ALPN.to_vec()])
1865 .relay_mode(RelayMode::Custom(relay_map))
1866 .insecure_skip_relay_cert_verify(true)
1867 .bind()
1868 .await
1869 .unwrap();
1870
1871 tokio::time::timeout(Duration::from_secs(10), ep.direct_addresses().next())
1872 .await
1873 .unwrap()
1874 .unwrap();
1875 }
1876}