iroh_net/
endpoint.rs

1//! The [`Endpoint`] allows establishing connections to other iroh-net nodes.
2//!
3//! The [`Endpoint`] is the main API interface to manage a local iroh-net node.  It allows
4//! connecting to and accepting connections from other nodes.  See the [module docs] for
5//! more details on how iroh-net connections work.
6//!
7//! The main items in this module are:
8//!
9//! - [`Endpoint`] to establish iroh-net connections with other nodes.
10//! - [`Builder`] to create an [`Endpoint`].
11//!
12//! [module docs]: crate
13
14use std::{
15    any::Any,
16    future::{Future, IntoFuture},
17    net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
18    pin::Pin,
19    sync::Arc,
20    task::Poll,
21    time::Duration,
22};
23
24use anyhow::{anyhow, bail, Context, Result};
25use derive_more::Debug;
26use futures_lite::{Stream, StreamExt};
27use pin_project::pin_project;
28use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
29use tracing::{debug, instrument, trace, warn};
30use url::Url;
31
32use crate::{
33    discovery::{
34        dns::DnsDiscovery, pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryTask,
35    },
36    dns::{default_resolver, DnsResolver},
37    key::{PublicKey, SecretKey},
38    magicsock::{self, Handle, QuicMappedAddr},
39    relay::{force_staging_infra, RelayMode, RelayUrl},
40    tls, NodeId,
41};
42
43mod rtt_actor;
44
45pub use bytes::Bytes;
46pub use iroh_base::node_addr::{AddrInfo, NodeAddr};
47// Missing still: SendDatagram and ConnectionClose::frame_type's Type.
48pub use quinn::{
49    AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, Connection,
50    ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni,
51    ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError,
52    SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt,
53    WeakConnectionHandle, WriteError, ZeroRttAccepted,
54};
55pub use quinn_proto::{
56    congestion::{Controller, ControllerFactory},
57    crypto::{
58        AeadKey, CryptoError, ExportKeyingMaterialError, HandshakeTokenKey,
59        ServerConfig as CryptoServerConfig, UnsupportedVersion,
60    },
61    FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written,
62};
63
64use self::rtt_actor::RttMessage;
65pub use super::magicsock::{
66    ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddr, DirectAddrInfo, DirectAddrType,
67    DirectAddrsStream, RemoteInfo, Source,
68};
69
70/// The delay to fall back to discovery when direct addresses fail.
71///
72/// When a connection is attempted with a [`NodeAddr`] containing direct addresses the
73/// [`Endpoint`] assumes one of those addresses probably works.  If after this delay there
74/// is still no connection the configured [`Discovery`] will be used however.
75const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500);
76
77type DiscoveryBuilder = Box<dyn FnOnce(&SecretKey) -> Option<Box<dyn Discovery>> + Send + Sync>;
78
79/// Builder for [`Endpoint`].
80///
81/// By default the endpoint will generate a new random [`SecretKey`], which will result in a
82/// new [`NodeId`].
83///
84/// To create the [`Endpoint`] call [`Builder::bind`].
85#[derive(Debug)]
86pub struct Builder {
87    secret_key: Option<SecretKey>,
88    relay_mode: RelayMode,
89    alpn_protocols: Vec<Vec<u8>>,
90    transport_config: Option<quinn::TransportConfig>,
91    keylog: bool,
92    #[debug(skip)]
93    discovery: Vec<DiscoveryBuilder>,
94    proxy_url: Option<Url>,
95    /// List of known nodes. See [`Builder::known_nodes`].
96    node_map: Option<Vec<NodeAddr>>,
97    dns_resolver: Option<DnsResolver>,
98    #[cfg(any(test, feature = "test-utils"))]
99    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
100    insecure_skip_relay_cert_verify: bool,
101    addr_v4: Option<SocketAddrV4>,
102    addr_v6: Option<SocketAddrV6>,
103}
104
105impl Default for Builder {
106    fn default() -> Self {
107        Self {
108            secret_key: Default::default(),
109            relay_mode: default_relay_mode(),
110            alpn_protocols: Default::default(),
111            transport_config: Default::default(),
112            keylog: Default::default(),
113            discovery: Default::default(),
114            proxy_url: None,
115            node_map: None,
116            dns_resolver: None,
117            #[cfg(any(test, feature = "test-utils"))]
118            insecure_skip_relay_cert_verify: false,
119            addr_v4: None,
120            addr_v6: None,
121        }
122    }
123}
124
125impl Builder {
126    // The ordering of public methods is reflected directly in the documentation.  This is
127    // roughly ordered by what is most commonly needed by users.
128
129    // # The final constructor that everyone needs.
130
131    /// Binds the magic endpoint.
132    pub async fn bind(self) -> Result<Endpoint> {
133        let relay_map = self.relay_mode.relay_map();
134        let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate);
135        let static_config = StaticConfig {
136            transport_config: Arc::new(self.transport_config.unwrap_or_default()),
137            keylog: self.keylog,
138            secret_key: secret_key.clone(),
139        };
140        let dns_resolver = self
141            .dns_resolver
142            .unwrap_or_else(|| default_resolver().clone());
143        let discovery = self
144            .discovery
145            .into_iter()
146            .filter_map(|f| f(&secret_key))
147            .collect::<Vec<_>>();
148        let discovery: Option<Box<dyn Discovery>> = match discovery.len() {
149            0 => None,
150            1 => Some(discovery.into_iter().next().unwrap()),
151            _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))),
152        };
153        let msock_opts = magicsock::Options {
154            addr_v4: self.addr_v4,
155            addr_v6: self.addr_v6,
156            secret_key,
157            relay_map,
158            node_map: self.node_map,
159            discovery,
160            proxy_url: self.proxy_url,
161            dns_resolver,
162            #[cfg(any(test, feature = "test-utils"))]
163            insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
164        };
165        Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await
166    }
167
168    // # The very common methods everyone basically needs.
169
170    /// Sets the IPv4 bind address.
171    ///
172    /// Setting the port to `0` will use a random port.
173    /// If the port specified is already in use, it will fallback to choosing a random port.
174    ///
175    /// By default will use `0.0.0.0:0` to bind to.
176    pub fn bind_addr_v4(mut self, addr: SocketAddrV4) -> Self {
177        self.addr_v4.replace(addr);
178        self
179    }
180
181    /// Sets the IPv6 bind address.
182    ///
183    /// Setting the port to `0` will use a random port.
184    /// If the port specified is already in use, it will fallback to choosing a random port.
185    ///
186    /// By default will use `[::]:0` to bind to.
187    pub fn bind_addr_v6(mut self, addr: SocketAddrV6) -> Self {
188        self.addr_v6.replace(addr);
189        self
190    }
191
192    /// Sets a secret key to authenticate with other peers.
193    ///
194    /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus
195    /// also its [`NodeId`]
196    ///
197    /// If not set, a new secret key will be generated.
198    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
199        self.secret_key = Some(secret_key);
200        self
201    }
202
203    /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections.
204    ///
205    /// Not setting this will still allow creating connections, but to accept incoming
206    /// connections the [ALPN] must be set.
207    ///
208    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
209    pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
210        self.alpn_protocols = alpn_protocols;
211        self
212    }
213
214    // # Methods for common customisation items.
215
216    /// Sets the relay servers to assist in establishing connectivity.
217    ///
218    /// Relay servers are used to establish initial connection with another iroh-net node.
219    /// They also perform various functions related to hole punching, see the [crate docs]
220    /// for more details.
221    ///
222    /// By default the [number 0] relay servers are used, see [`RelayMode::Default`].
223    ///
224    /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one
225    /// configured relay node.  If an invalid RelayMap is provided [`bind`]
226    /// will result in an error.
227    ///
228    /// [`bind`]: Builder::bind
229    /// [crate docs]: crate
230    /// [number 0]: https://n0.computer
231    pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
232        self.relay_mode = relay_mode;
233        self
234    }
235
236    /// Removes all discovery services from the builder.
237    pub fn clear_discovery(mut self) -> Self {
238        self.discovery.clear();
239        self
240    }
241
242    /// Optionally sets a discovery mechanism for this endpoint.
243    ///
244    /// If you want to combine multiple discovery services, you can use
245    /// [`Builder::add_discovery`] instead. This will internally create a
246    /// [`crate::discovery::ConcurrentDiscovery`].
247    ///
248    /// If no discovery service is set, connecting to a node without providing its
249    /// direct addresses or relay URLs will fail.
250    ///
251    /// See the documentation of the [`Discovery`] trait for details.
252    pub fn discovery(mut self, discovery: Box<dyn Discovery>) -> Self {
253        self.discovery.clear();
254        self.discovery.push(Box::new(move |_| Some(discovery)));
255        self
256    }
257
258    /// Adds a discovery mechanism for this endpoint.
259    ///
260    /// The function `discovery`
261    /// will be called on endpoint creation with the configured secret key of
262    /// the endpoint. Discovery services that need to publish information need
263    /// to use this secret key to sign the information.
264    ///
265    /// If you add multiple discovery services, they will be combined using a
266    /// [`crate::discovery::ConcurrentDiscovery`].
267    ///
268    /// If no discovery service is set, connecting to a node without providing its
269    /// direct addresses or relay URLs will fail.
270    ///
271    /// To clear all discovery services, use [`Builder::clear_discovery`].
272    ///
273    /// See the documentation of the [`Discovery`] trait for details.
274    pub fn add_discovery<F, D>(mut self, discovery: F) -> Self
275    where
276        F: FnOnce(&SecretKey) -> Option<D> + Send + Sync + 'static,
277        D: Discovery + 'static,
278    {
279        let discovery: DiscoveryBuilder =
280            Box::new(move |secret_key| discovery(secret_key).map(|x| Box::new(x) as _));
281        self.discovery.push(discovery);
282        self
283    }
284
285    /// Configures the endpoint to use the default n0 DNS discovery service.
286    ///
287    /// The default discovery service publishes to and resolves from the
288    /// n0.computer dns server `iroh.link`.
289    ///
290    /// This is equivalent to adding both a [`crate::discovery::pkarr::PkarrPublisher`]
291    /// and a [`crate::discovery::dns::DnsDiscovery`], both configured to use the
292    /// n0.computer dns server.
293    ///
294    /// This will by default use [`N0_DNS_PKARR_RELAY_PROD`].
295    /// When in tests, or when the `test-utils` feature is enabled, this will use the
296    /// [`N0_DNS_PKARR_RELAY_STAGING`].
297    ///
298    /// [`N0_DNS_PKARR_RELAY_PROD`]: crate::discovery::pkarr::N0_DNS_PKARR_RELAY_PROD
299    /// [`N0_DNS_PKARR_RELAY_STAGING`]: crate::discovery::pkarr::N0_DNS_PKARR_RELAY_STAGING
300    pub fn discovery_n0(mut self) -> Self {
301        self.discovery.push(Box::new(|secret_key| {
302            Some(Box::new(PkarrPublisher::n0_dns(secret_key.clone())))
303        }));
304        self.discovery
305            .push(Box::new(|_| Some(Box::new(DnsDiscovery::n0_dns()))));
306        self
307    }
308
309    #[cfg(feature = "discovery-pkarr-dht")]
310    /// Configures the endpoint to also use the mainline DHT with default settings.
311    ///
312    /// This is equivalent to adding a [`crate::discovery::pkarr::dht::DhtDiscovery`]
313    /// with default settings. Note that DhtDiscovery has various more advanced
314    /// configuration options. If you need any of those, you should manually
315    /// create a DhtDiscovery and add it with [`Builder::add_discovery`].
316    pub fn discovery_dht(mut self) -> Self {
317        use crate::discovery::pkarr::dht::DhtDiscovery;
318        self.discovery.push(Box::new(|secret_key| {
319            Some(Box::new(
320                DhtDiscovery::builder()
321                    .secret_key(secret_key.clone())
322                    .build()
323                    .unwrap(),
324            ))
325        }));
326        self
327    }
328
329    #[cfg(feature = "discovery-local-network")]
330    /// Configures the endpoint to also use local network discovery.
331    ///
332    /// This is equivalent to adding a [`crate::discovery::local_swarm_discovery::LocalSwarmDiscovery`]
333    /// with default settings. Note that LocalSwarmDiscovery has various more advanced
334    /// configuration options. If you need any of those, you should manually
335    /// create a LocalSwarmDiscovery and add it with [`Builder::add_discovery`].
336    pub fn discovery_local_network(mut self) -> Self {
337        use crate::discovery::local_swarm_discovery::LocalSwarmDiscovery;
338        self.discovery.push(Box::new(|secret_key| {
339            LocalSwarmDiscovery::new(secret_key.public())
340                .map(|x| Box::new(x) as _)
341                .ok()
342        }));
343        self
344    }
345
346    /// Optionally set a list of known nodes.
347    pub fn known_nodes(mut self, nodes: Vec<NodeAddr>) -> Self {
348        self.node_map = Some(nodes);
349        self
350    }
351
352    // # Methods for more specialist customisation.
353
354    /// Sets a custom [`quinn::TransportConfig`] for this endpoint.
355    ///
356    /// The transport config contains parameters governing the QUIC state machine.
357    ///
358    /// If unset, the default config is used. Default values should be suitable for most
359    /// internet applications. Applications protocols which forbid remotely-initiated
360    /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to
361    /// zero.
362    pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self {
363        self.transport_config = Some(transport_config);
364        self
365    }
366
367    /// Optionally sets a custom DNS resolver to use for this endpoint.
368    ///
369    /// The DNS resolver is used to resolve relay hostnames, and node addresses if
370    /// [`crate::discovery::dns::DnsDiscovery`] is configured.
371    ///
372    /// By default, all endpoints share a DNS resolver, which is configured to use the
373    /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`]
374    /// here to use a differently configured DNS resolver for this endpoint.
375    pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
376        self.dns_resolver = Some(dns_resolver);
377        self
378    }
379
380    /// Sets an explicit proxy url to proxy all HTTP(S) traffic through.
381    pub fn proxy_url(mut self, url: Url) -> Self {
382        self.proxy_url.replace(url);
383        self
384    }
385
386    /// Sets the proxy url from the environment, in this order:
387    ///
388    /// - `HTTP_PROXY`
389    /// - `http_proxy`
390    /// - `HTTPS_PROXY`
391    /// - `https_proxy`
392    pub fn proxy_from_env(mut self) -> Self {
393        self.proxy_url = proxy_url_from_env();
394        self
395    }
396
397    /// Enables saving the TLS pre-master key for connections.
398    ///
399    /// This key should normally remain secret but can be useful to debug networking issues
400    /// by decrypting captured traffic.
401    ///
402    /// If *keylog* is `true` then setting the `SSLKEYLOGFILE` environment variable to a
403    /// filename will result in this file being used to log the TLS pre-master keys.
404    pub fn keylog(mut self, keylog: bool) -> Self {
405        self.keylog = keylog;
406        self
407    }
408
409    /// Skip verification of SSL certificates from relay servers
410    ///
411    /// May only be used in tests.
412    #[cfg(any(test, feature = "test-utils"))]
413    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
414    pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
415        self.insecure_skip_relay_cert_verify = skip_verify;
416        self
417    }
418}
419
420/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
421#[derive(Debug)]
422struct StaticConfig {
423    secret_key: SecretKey,
424    transport_config: Arc<quinn::TransportConfig>,
425    keylog: bool,
426}
427
428impl StaticConfig {
429    /// Create a [`quinn::ServerConfig`] with the specified ALPN protocols.
430    fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> Result<ServerConfig> {
431        let server_config = make_server_config(
432            &self.secret_key,
433            alpn_protocols,
434            self.transport_config.clone(),
435            self.keylog,
436        )?;
437        Ok(server_config)
438    }
439}
440
441/// Creates a [`ServerConfig`] with the given secret key and limits.
442// This return type can not longer be used anywhere in our public API.  It is however still
443// used by iroh::node::Node (or rather iroh::node::Builder) to create a plain Quinn
444// endpoint.
445pub fn make_server_config(
446    secret_key: &SecretKey,
447    alpn_protocols: Vec<Vec<u8>>,
448    transport_config: Arc<TransportConfig>,
449    keylog: bool,
450) -> Result<ServerConfig> {
451    let quic_server_config = tls::make_server_config(secret_key, alpn_protocols, keylog)?;
452    let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
453    server_config.transport_config(transport_config);
454
455    Ok(server_config)
456}
457
458/// Controls an iroh-net node, establishing connections with other nodes.
459///
460/// This is the main API interface to create connections to, and accept connections from
461/// other iroh-net nodes.  The connections are peer-to-peer and encrypted, a Relay server is
462/// used to make the connections reliable.  See the [crate docs] for a more detailed
463/// overview of iroh-net.
464///
465/// It is recommended to only create a single instance per application.  This ensures all
466/// the connections made share the same peer-to-peer connections to other iroh-net nodes,
467/// while still remaining independent connections.  This will result in more optimal network
468/// behaviour.
469///
470/// New connections are typically created using the [`Endpoint::connect`] and
471/// [`Endpoint::accept`] methods.  Once established, the [`Connection`] gives access to most
472/// [QUIC] features.  Individual streams to send data to the peer are created using the
473/// [`Connection::open_bi`], [`Connection::accept_bi`], [`Connection::open_uni`] and
474/// [`Connection::open_bi`] functions.
475///
476/// Note that due to the light-weight properties of streams a stream will only be accepted
477/// once the initiating peer has sent some data on it.
478///
479/// [QUIC]: https://quicwg.org
480#[derive(Clone, Debug)]
481pub struct Endpoint {
482    msock: Handle,
483    endpoint: quinn::Endpoint,
484    rtt_actor: Arc<rtt_actor::RttHandle>,
485    cancel_token: CancellationToken,
486    static_config: Arc<StaticConfig>,
487}
488
489impl Endpoint {
490    // The ordering of public methods is reflected directly in the documentation.  This is
491    // roughly ordered by what is most commonly needed by users, but grouped in similar
492    // items.
493
494    // # Methods relating to construction.
495
496    /// Returns the builder for an [`Endpoint`], with a production configuration.
497    pub fn builder() -> Builder {
498        Builder::default()
499    }
500
501    /// Creates a quinn endpoint backed by a magicsock.
502    ///
503    /// This is for internal use, the public interface is the [`Builder`] obtained from
504    /// [Self::builder]. See the methods on the builder for documentation of the parameters.
505    #[instrument("ep", skip_all, fields(me = %static_config.secret_key.public().fmt_short()))]
506    async fn bind(
507        static_config: StaticConfig,
508        msock_opts: magicsock::Options,
509        initial_alpns: Vec<Vec<u8>>,
510    ) -> Result<Self> {
511        let msock = magicsock::MagicSock::spawn(msock_opts).await?;
512        trace!("created magicsock");
513
514        let server_config = static_config.create_server_config(initial_alpns)?;
515
516        let mut endpoint_config = quinn::EndpointConfig::default();
517        // Setting this to false means that quinn will ignore packets that have the QUIC fixed bit
518        // set to 0. The fixed bit is the 3rd bit of the first byte of a packet.
519        // For performance reasons and to not rewrite buffers we pass non-QUIC UDP packets straight
520        // through to quinn. We set the first byte of the packet to zero, which makes quinn ignore
521        // the packet if grease_quic_bit is set to false.
522        endpoint_config.grease_quic_bit(false);
523
524        let endpoint = quinn::Endpoint::new_with_abstract_socket(
525            endpoint_config,
526            Some(server_config),
527            Arc::new(msock.clone()),
528            Arc::new(quinn::TokioRuntime),
529        )?;
530        trace!("created quinn endpoint");
531        debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
532        Ok(Self {
533            msock,
534            endpoint,
535            rtt_actor: Arc::new(rtt_actor::RttHandle::new()),
536            cancel_token: CancellationToken::new(),
537            static_config: Arc::new(static_config),
538        })
539    }
540
541    /// Sets the list of accepted ALPN protocols.
542    ///
543    /// This will only affect new incoming connections.
544    /// Note that this *overrides* the current list of ALPNs.
545    pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) -> Result<()> {
546        let server_config = self.static_config.create_server_config(alpns)?;
547        self.endpoint.set_server_config(Some(server_config));
548        Ok(())
549    }
550
551    // # Methods for establishing connectivity.
552
553    /// Connects to a remote [`Endpoint`].
554    ///
555    /// A value that can be converted into a [`NodeAddr`] is required. This can be either a
556    /// [`NodeAddr`], a [`NodeId`] or a [`iroh_base::ticket::NodeTicket`].
557    ///
558    /// The [`NodeAddr`] must contain the [`NodeId`] to dial and may also contain a [`RelayUrl`]
559    /// and direct addresses. If direct addresses are provided, they will be used to try and
560    /// establish a direct connection without involving a relay server.
561    ///
562    /// If neither a [`RelayUrl`] or direct addresses are configured in the [`NodeAddr`] it
563    /// may still be possible a connection can be established.  This depends on other calls
564    /// to [`Endpoint::add_node_addr`] which may provide contact information, or via the
565    /// [`Discovery`] service configured using [`Builder::discovery`].  The discovery
566    /// service will also be used if the remote node is not reachable on the provided direct
567    /// addresses and there is no [`RelayUrl`].
568    ///
569    /// If addresses or relay servers are neither provided nor can be discovered, the
570    /// connection attempt will fail with an error.
571    ///
572    /// The `alpn`, or application-level protocol identifier, is also required. The remote
573    /// endpoint must support this `alpn`, otherwise the connection attempt will fail with
574    /// an error.
575    #[instrument(skip_all, fields(me = %self.node_id().fmt_short(), alpn = ?String::from_utf8_lossy(alpn)))]
576    pub async fn connect(&self, node_addr: impl Into<NodeAddr>, alpn: &[u8]) -> Result<Connection> {
577        let node_addr = node_addr.into();
578        tracing::Span::current().record("remote", node_addr.node_id.fmt_short());
579        // Connecting to ourselves is not supported.
580        if node_addr.node_id == self.node_id() {
581            bail!(
582                "Connecting to ourself is not supported ({} is the node id of this node)",
583                node_addr.node_id.fmt_short()
584            );
585        }
586
587        if !node_addr.info.is_empty() {
588            self.add_node_addr(node_addr.clone())?;
589        }
590
591        let NodeAddr { node_id, info } = node_addr.clone();
592
593        // Get the mapped IPv6 address from the magic socket. Quinn will connect to this address.
594        // Start discovery for this node if it's enabled and we have no valid or verified
595        // address information for this node.
596        let (addr, discovery) = self
597            .get_mapping_addr_and_maybe_start_discovery(node_addr)
598            .await
599            .with_context(|| {
600                format!(
601                    "No addressing information for NodeId({}), unable to connect",
602                    node_id.fmt_short()
603                )
604            })?;
605
606        debug!(
607            "connecting to {}: (via {} - {:?})",
608            node_id, addr, info.direct_addresses
609        );
610
611        // Start connecting via quinn. This will time out after 10 seconds if no reachable address
612        // is available.
613        let conn = self.connect_quinn(node_id, alpn, addr).await;
614
615        // Cancel the node discovery task (if still running).
616        if let Some(discovery) = discovery {
617            discovery.cancel();
618        }
619
620        conn
621    }
622
623    /// Connects to a remote endpoint, using just the nodes's [`NodeId`].
624    ///
625    /// This is a convenience function for [`Endpoint::connect`].  It relies on addressing
626    /// information being provided by either the discovery service or using
627    /// [`Endpoint::add_node_addr`].  See [`Endpoint::connect`] for the details of how it
628    /// uses the discovery service to establish a connection to a remote node.
629    #[deprecated(
630        since = "0.27.0",
631        note = "Please use `connect` directly with a NodeId. This fn will be removed in 0.28.0."
632    )]
633    pub async fn connect_by_node_id(&self, node_id: NodeId, alpn: &[u8]) -> Result<Connection> {
634        let addr = NodeAddr::new(node_id);
635        self.connect(addr, alpn).await
636    }
637
638    #[instrument(
639        skip_all,
640        fields(remote_node = node_id.fmt_short(), alpn = %String::from_utf8_lossy(alpn))
641    )]
642    async fn connect_quinn(
643        &self,
644        node_id: NodeId,
645        alpn: &[u8],
646        addr: QuicMappedAddr,
647    ) -> Result<Connection> {
648        debug!("Attempting connection...");
649        let client_config = {
650            let alpn_protocols = vec![alpn.to_vec()];
651            let quic_client_config = tls::make_client_config(
652                &self.static_config.secret_key,
653                Some(node_id),
654                alpn_protocols,
655                self.static_config.keylog,
656            )?;
657            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
658            let mut transport_config = quinn::TransportConfig::default();
659            transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
660            client_config.transport_config(Arc::new(transport_config));
661            client_config
662        };
663
664        // TODO: We'd eventually want to replace "localhost" with something that makes more sense.
665        let connect = self
666            .endpoint
667            .connect_with(client_config, addr.0, "localhost")?;
668
669        let connection = connect
670            .await
671            .context("failed connecting to remote endpoint")?;
672
673        let rtt_msg = RttMessage::NewConnection {
674            connection: connection.weak_handle(),
675            conn_type_changes: self.conn_type_stream(node_id)?,
676            node_id,
677        };
678        if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await {
679            // If this actor is dead, that's not great but we can still function.
680            warn!("rtt-actor not reachable: {err:#}");
681        }
682        debug!("Connection established");
683        Ok(connection)
684    }
685
686    /// Accepts an incoming connection on the endpoint.
687    ///
688    /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted.
689    /// If multiple ALPNs have been configured the ALPN can be inspected before accepting
690    /// the connection using [`Connecting::alpn`].
691    ///
692    /// The returned future will yield `None` if the endpoint is closed by calling
693    /// [`Endpoint::close`].
694    pub fn accept(&self) -> Accept<'_> {
695        Accept {
696            inner: self.endpoint.accept(),
697            ep: self.clone(),
698        }
699    }
700
701    // # Methods for manipulating the internal state about other nodes.
702
703    /// Informs this [`Endpoint`] about addresses of the iroh-net node.
704    ///
705    /// This updates the local state for the remote node.  If the provided [`NodeAddr`]
706    /// contains a [`RelayUrl`] this will be used as the new relay server for this node.  If
707    /// it contains any new IP endpoints they will also be stored and tried when next
708    /// connecting to this node. Any address that matches this node's direct addresses will be
709    /// silently ignored.
710    ///
711    /// See also [`Endpoint::add_node_addr_with_source`].
712    ///
713    /// # Errors
714    ///
715    /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
716    /// direct addresses are a subset of ours.
717    pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> {
718        self.add_node_addr_inner(node_addr, magicsock::Source::App)
719    }
720
721    /// Informs this [`Endpoint`] about addresses of the iroh-net node, noting the source.
722    ///
723    /// This updates the local state for the remote node.  If the provided [`NodeAddr`] contains a
724    /// [`RelayUrl`] this will be used as the new relay server for this node.  If it contains any
725    /// new IP endpoints they will also be stored and tried when next connecting to this node. Any
726    /// address that matches this node's direct addresses will be silently ignored. The *source* is
727    /// used for logging exclusively and will not be stored.
728    ///
729    /// # Errors
730    ///
731    /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the
732    /// direct addresses are a subset of ours.
733    pub fn add_node_addr_with_source(
734        &self,
735        node_addr: NodeAddr,
736        source: &'static str,
737    ) -> Result<()> {
738        self.add_node_addr_inner(
739            node_addr,
740            magicsock::Source::NamedApp {
741                name: source.into(),
742            },
743        )
744    }
745
746    fn add_node_addr_inner(&self, node_addr: NodeAddr, source: magicsock::Source) -> Result<()> {
747        // Connecting to ourselves is not supported.
748        if node_addr.node_id == self.node_id() {
749            bail!(
750                "Adding our own address is not supported ({} is the node id of this node)",
751                node_addr.node_id.fmt_short()
752            );
753        }
754        self.msock.add_node_addr(node_addr, source)
755    }
756
757    // # Getter methods for properties of this Endpoint itself.
758
759    /// Returns the secret_key of this endpoint.
760    pub fn secret_key(&self) -> &SecretKey {
761        &self.static_config.secret_key
762    }
763
764    /// Returns the node id of this endpoint.
765    ///
766    /// This ID is the unique addressing information of this node and other peers must know
767    /// it to be able to connect to this node.
768    pub fn node_id(&self) -> NodeId {
769        self.static_config.secret_key.public()
770    }
771
772    /// Returns the current [`NodeAddr`] for this endpoint.
773    ///
774    /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and local IP endpoints
775    /// as they would be returned by [`Endpoint::home_relay`] and
776    /// [`Endpoint::direct_addresses`].
777    pub async fn node_addr(&self) -> Result<NodeAddr> {
778        let addrs = self
779            .direct_addresses()
780            .next()
781            .await
782            .ok_or(anyhow!("No IP endpoints found"))?;
783        let relay = self.home_relay();
784        Ok(NodeAddr::from_parts(
785            self.node_id(),
786            relay,
787            addrs.into_iter().map(|x| x.addr),
788        ))
789    }
790
791    /// Returns the [`RelayUrl`] of the Relay server used as home relay.
792    ///
793    /// Every endpoint has a home Relay server which it chooses as the server with the
794    /// lowest latency out of the configured servers provided by [`Builder::relay_mode`].
795    /// This is the server other iroh-net nodes can use to reliably establish a connection
796    /// to this node.
797    ///
798    /// Returns `None` if we are not connected to any Relay server.
799    ///
800    /// Note that this will be `None` right after the [`Endpoint`] is created since it takes
801    /// some time to connect to find and connect to the home relay server.  Use
802    /// [`Endpoint::watch_home_relay`] to wait until the home relay server is available.
803    pub fn home_relay(&self) -> Option<RelayUrl> {
804        self.msock.my_relay()
805    }
806
807    /// Watches for changes to the home relay.
808    ///
809    /// If there is currently a home relay it will be yielded immediately as the first item
810    /// in the stream.  This makes it possible to use this function to wait for the initial
811    /// home relay to be known.
812    ///
813    /// Note that it is not guaranteed that a home relay will ever become available.  If no
814    /// servers are configured with [`Builder::relay_mode`] this stream will never yield an
815    /// item.
816    pub fn watch_home_relay(&self) -> impl Stream<Item = RelayUrl> {
817        self.msock.watch_home_relay()
818    }
819
820    /// Returns the direct addresses of this [`Endpoint`].
821    ///
822    /// The direct addresses of the [`Endpoint`] are those that could be used by other
823    /// iroh-net nodes to establish direct connectivity, depending on the network
824    /// situation. The yielded lists of direct addresses contain both the locally-bound
825    /// addresses and the [`Endpoint`]'s publicly reachable addresses discovered through
826    /// mechanisms such as [STUN] and port mapping.  Hence usually only a subset of these
827    /// will be applicable to a certain remote iroh-net node.
828    ///
829    /// The [`Endpoint`] continuously monitors the direct addresses for changes as its own
830    /// location in the network might change.  Whenever changes are detected this stream
831    /// will yield a new list of direct addresses.
832    ///
833    /// When issuing the first call to this method the first direct address discovery might
834    /// still be underway, in this case the first item of the returned stream will not be
835    /// immediately available.  Once this first set of local IP endpoints are discovered the
836    /// stream will always return the first set of IP endpoints immediately, which are the
837    /// most recently discovered IP endpoints.
838    ///
839    /// # Examples
840    ///
841    /// To get the current endpoints, drop the stream after the first item was received:
842    /// ```
843    /// use futures_lite::StreamExt;
844    /// use iroh_net::Endpoint;
845    ///
846    /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
847    /// # rt.block_on(async move {
848    /// let mep =  Endpoint::builder().bind().await.unwrap();
849    /// let _addrs = mep.direct_addresses().next().await;
850    /// # });
851    /// ```
852    ///
853    /// [STUN]: https://en.wikipedia.org/wiki/STUN
854    pub fn direct_addresses(&self) -> DirectAddrsStream {
855        self.msock.direct_addresses()
856    }
857
858    /// Returns the local socket addresses on which the underlying sockets are bound.
859    ///
860    /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
861    /// address if available.
862    pub fn bound_sockets(&self) -> (SocketAddr, Option<SocketAddr>) {
863        self.msock.local_addr()
864    }
865
866    // # Getter methods for information about other nodes.
867
868    /// Returns information about the remote node identified by a [`NodeId`].
869    ///
870    /// The [`Endpoint`] keeps some information about remote iroh-net nodes, which it uses to find
871    /// the best path to a node. Having information on a remote node, however, does not mean we have
872    /// ever connected to it to or even whether a connection is even possible. The information about a
873    /// remote node will change over time, as the [`Endpoint`] learns more about the node. Future
874    /// calls may return different information. Furthermore, node information may even be
875    /// completely evicted as it becomes stale.
876    ///
877    /// See also [`Endpoint::remote_info_iter`] which returns information on all nodes known
878    /// by this [`Endpoint`].
879    pub fn remote_info(&self, node_id: NodeId) -> Option<RemoteInfo> {
880        self.msock.remote_info(node_id)
881    }
882
883    /// Returns information about all the remote nodes this [`Endpoint`] knows about.
884    ///
885    /// This returns the same information as [`Endpoint::remote_info`] for each node known to this
886    /// [`Endpoint`].
887    ///
888    /// The [`Endpoint`] keeps some information about remote iroh-net nodes, which it uses to find
889    /// the best path to a node. This returns all the nodes it knows about, regardless of whether a
890    /// connection was ever made or is even possible.
891    ///
892    /// See also [`Endpoint::remote_info`] to only retrieve information about a single node.
893    pub fn remote_info_iter(&self) -> impl Iterator<Item = RemoteInfo> {
894        self.msock.list_remote_infos().into_iter()
895    }
896
897    // # Methods for less common getters.
898    //
899    // Partially they return things passed into the builder.
900
901    /// Returns a stream that reports connection type changes for the remote node.
902    ///
903    /// This returns a stream of [`ConnectionType`] items, each time the underlying
904    /// connection to a remote node changes it yields an item.  These connection changes are
905    /// when the connection switches between using the Relay server and a direct connection.
906    ///
907    /// If there is currently a connection with the remote node the first item in the stream
908    /// will yield immediately returning the current connection type.
909    ///
910    /// Note that this does not guarantee each connection change is yielded in the stream.
911    /// If the connection type changes several times before this stream is polled only the
912    /// last recorded state is returned.  This can be observed e.g. right at the start of a
913    /// connection when the switch from a relayed to a direct connection can be so fast that
914    /// the relayed state is never exposed.
915    ///
916    /// # Errors
917    ///
918    /// Will error if we do not have any address information for the given `node_id`.
919    pub fn conn_type_stream(&self, node_id: NodeId) -> Result<ConnectionTypeStream> {
920        self.msock.conn_type_stream(node_id)
921    }
922
923    /// Returns the DNS resolver used in this [`Endpoint`].
924    ///
925    /// See [`Builder::discovery`].
926    pub fn dns_resolver(&self) -> &DnsResolver {
927        self.msock.dns_resolver()
928    }
929
930    /// Returns the discovery mechanism, if configured.
931    ///
932    /// See [`Builder::dns_resolver`].
933    pub fn discovery(&self) -> Option<&dyn Discovery> {
934        self.msock.discovery()
935    }
936
937    // # Methods for less common state updates.
938
939    /// Notifies the system of potential network changes.
940    ///
941    /// On many systems iroh is able to detect network changes by itself, however
942    /// some systems like android do not expose this functionality to native code.
943    /// Android does however provide this functionality to Java code.  This
944    /// function allows for notifying iroh of any potential network changes like
945    /// this.
946    ///
947    /// Even when the network did not change, or iroh was already able to detect
948    /// the network change itself, there is no harm in calling this function.
949    pub async fn network_change(&self) {
950        self.msock.network_change().await;
951    }
952
953    // # Methods for terminating the endpoint.
954
955    /// Closes the QUIC endpoint and the magic socket.
956    ///
957    /// This will close all open QUIC connections with the provided error_code and
958    /// reason. See [`quinn::Connection`] for details on how these are interpreted.
959    ///
960    /// It will then wait for all connections to actually be shutdown, and afterwards
961    /// close the magic socket.
962    ///
963    /// Returns an error if closing the magic socket failed.
964    /// TODO: Document error cases.
965    pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> {
966        let Endpoint {
967            msock,
968            endpoint,
969            cancel_token,
970            ..
971        } = self;
972        cancel_token.cancel();
973        tracing::debug!("Closing connections");
974        endpoint.close(error_code, reason);
975        endpoint.wait_idle().await;
976        // In case this is the last clone of `Endpoint`, dropping the `quinn::Endpoint` will
977        // make it more likely that the underlying socket is not polled by quinn anymore after this
978        drop(endpoint);
979        tracing::debug!("Connections closed");
980
981        msock.close().await?;
982        Ok(())
983    }
984
985    // # Remaining private methods
986
987    pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> {
988        self.cancel_token.cancelled()
989    }
990
991    /// Return the quic mapped address for this `node_id` and possibly start discovery
992    /// services if discovery is enabled on this magic endpoint.
993    ///
994    /// This will launch discovery in all cases except if:
995    /// 1) we do not have discovery enabled
996    /// 2) we have discovery enabled, but already have at least one verified, unexpired
997    ///    addresses for this `node_id`
998    ///
999    /// # Errors
1000    ///
1001    /// This method may fail if we have no way of dialing the node. This can occur if
1002    /// we were given no dialing information in the [`NodeAddr`] and no discovery
1003    /// services were configured or if discovery failed to fetch any dialing information.
1004    async fn get_mapping_addr_and_maybe_start_discovery(
1005        &self,
1006        node_addr: NodeAddr,
1007    ) -> Result<(QuicMappedAddr, Option<DiscoveryTask>)> {
1008        let node_id = node_addr.node_id;
1009
1010        // Only return a mapped addr if we have some way of dialing this node, in other
1011        // words, we have either a relay URL or at least one direct address.
1012        let addr = if self.msock.has_send_address(node_id) {
1013            self.msock.get_mapping_addr(node_id)
1014        } else {
1015            None
1016        };
1017        match addr {
1018            Some(addr) => {
1019                // We have some way of dialing this node, but that doesn't actually mean
1020                // we can actually connect to any of these addresses.
1021                // Therefore, we will invoke the discovery service if we haven't received from the
1022                // endpoint on any of the existing paths recently.
1023                // If the user provided addresses in this connect call, we will add a delay
1024                // followed by a recheck before starting the discovery, to give the magicsocket a
1025                // chance to test the newly provided addresses.
1026                let delay = (!node_addr.info.is_empty()).then_some(DISCOVERY_WAIT_PERIOD);
1027                let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
1028                    .ok()
1029                    .flatten();
1030                Ok((addr, discovery))
1031            }
1032
1033            None => {
1034                // We have no known addresses or relay URLs for this node.
1035                // So, we start a discovery task and wait for the first result to arrive, and
1036                // only then continue, because otherwise we wouldn't have any
1037                // path to the remote endpoint.
1038                let mut discovery = DiscoveryTask::start(self.clone(), node_id)
1039                    .context("Discovery service required due to missing addressing information")?;
1040                discovery
1041                    .first_arrived()
1042                    .await
1043                    .context("Discovery service failed")?;
1044                if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1045                    Ok((addr, Some(discovery)))
1046                } else {
1047                    bail!("Discovery did not find addressing information");
1048                }
1049            }
1050        }
1051    }
1052
1053    #[cfg(test)]
1054    pub(crate) fn magic_sock(&self) -> Handle {
1055        self.msock.clone()
1056    }
1057    #[cfg(test)]
1058    pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1059        &self.endpoint
1060    }
1061}
1062
1063/// Future produced by [`Endpoint::accept`].
1064#[derive(Debug)]
1065#[pin_project]
1066pub struct Accept<'a> {
1067    #[pin]
1068    #[debug("quinn::Accept")]
1069    inner: quinn::Accept<'a>,
1070    ep: Endpoint,
1071}
1072
1073impl<'a> Future for Accept<'a> {
1074    type Output = Option<Incoming>;
1075
1076    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1077        let this = self.project();
1078        match this.inner.poll(cx) {
1079            Poll::Pending => Poll::Pending,
1080            Poll::Ready(None) => Poll::Ready(None),
1081            Poll::Ready(Some(inner)) => Poll::Ready(Some(Incoming {
1082                inner,
1083                ep: this.ep.clone(),
1084            })),
1085        }
1086    }
1087}
1088
1089/// An incoming connection for which the server has not yet begun its parts of the
1090/// handshake.
1091#[derive(Debug)]
1092pub struct Incoming {
1093    inner: quinn::Incoming,
1094    ep: Endpoint,
1095}
1096
1097impl Incoming {
1098    /// Attempts to accept this incoming connection (an error may still occur).
1099    ///
1100    /// Errors occurring here are likely not caused by the application or remote.  The QUIC
1101    /// connection listens on a normal UDP socket and any reachable network endpoint can
1102    /// send datagrams to it, solicited or not.  Even if the first few bytes look like a
1103    /// QUIC packet, it might not even be a QUIC packet that is being received.
1104    ///
1105    /// Thus it is common to simply log the errors here and accept them as something which
1106    /// can happen.
1107    pub fn accept(self) -> Result<Connecting, ConnectionError> {
1108        self.inner.accept().map(|conn| Connecting {
1109            inner: conn,
1110            ep: self.ep,
1111        })
1112    }
1113
1114    /// Accepts this incoming connection using a custom configuration.
1115    ///
1116    /// See [`accept()`] for more details.
1117    ///
1118    /// [`accept()`]: Incoming::accept
1119    pub fn accept_with(
1120        self,
1121        server_config: Arc<ServerConfig>,
1122    ) -> Result<Connecting, ConnectionError> {
1123        self.inner
1124            .accept_with(server_config)
1125            .map(|conn| Connecting {
1126                inner: conn,
1127                ep: self.ep,
1128            })
1129    }
1130
1131    /// Rejects this incoming connection attempt.
1132    pub fn refuse(self) {
1133        self.inner.refuse()
1134    }
1135
1136    /// Responds with a retry packet.
1137    ///
1138    /// This requires the client to retry with address validation.
1139    ///
1140    /// Errors if `remote_address_validated()` is true.
1141    pub fn retry(self) -> Result<(), RetryError> {
1142        self.inner.retry()
1143    }
1144
1145    /// Ignores this incoming connection attempt, not sending any packet in response.
1146    pub fn ignore(self) {
1147        self.inner.ignore()
1148    }
1149
1150    /// Returns the local IP address which was used when the peer established the
1151    /// connection.
1152    pub fn local_ip(&self) -> Option<IpAddr> {
1153        self.inner.local_ip()
1154    }
1155
1156    /// Returns the peer's UDP address.
1157    pub fn remote_address(&self) -> SocketAddr {
1158        self.inner.remote_address()
1159    }
1160
1161    /// Whether the socket address that is initiating this connection has been validated.
1162    ///
1163    /// This means that the sender of the initial packet has proved that they can receive
1164    /// traffic sent to `self.remote_address()`.
1165    pub fn remote_address_validated(&self) -> bool {
1166        self.inner.remote_address_validated()
1167    }
1168}
1169
1170impl IntoFuture for Incoming {
1171    type Output = Result<Connection, ConnectionError>;
1172    type IntoFuture = IncomingFuture;
1173
1174    fn into_future(self) -> Self::IntoFuture {
1175        IncomingFuture {
1176            inner: self.inner.into_future(),
1177            ep: self.ep,
1178        }
1179    }
1180}
1181
1182/// Adaptor to let [`Incoming`] be `await`ed like a [`Connecting`].
1183#[derive(Debug)]
1184#[pin_project]
1185pub struct IncomingFuture {
1186    #[pin]
1187    inner: quinn::IncomingFuture,
1188    ep: Endpoint,
1189}
1190
1191impl Future for IncomingFuture {
1192    type Output = Result<quinn::Connection, ConnectionError>;
1193
1194    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1195        let this = self.project();
1196        match this.inner.poll(cx) {
1197            Poll::Pending => Poll::Pending,
1198            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1199            Poll::Ready(Ok(conn)) => {
1200                try_send_rtt_msg(&conn, this.ep);
1201                Poll::Ready(Ok(conn))
1202            }
1203        }
1204    }
1205}
1206
1207/// In-progress connection attempt future
1208#[derive(Debug)]
1209#[pin_project]
1210pub struct Connecting {
1211    #[pin]
1212    inner: quinn::Connecting,
1213    ep: Endpoint,
1214}
1215
1216impl Connecting {
1217    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security.
1218    pub fn into_0rtt(self) -> Result<(Connection, ZeroRttAccepted), Self> {
1219        match self.inner.into_0rtt() {
1220            Ok((conn, zrtt_accepted)) => {
1221                try_send_rtt_msg(&conn, &self.ep);
1222                Ok((conn, zrtt_accepted))
1223            }
1224            Err(inner) => Err(Self { inner, ep: self.ep }),
1225        }
1226    }
1227
1228    /// Parameters negotiated during the handshake
1229    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
1230        self.inner.handshake_data().await
1231    }
1232
1233    /// The local IP address which was used when the peer established the connection.
1234    pub fn local_ip(&self) -> Option<IpAddr> {
1235        self.inner.local_ip()
1236    }
1237
1238    /// The peer's UDP address.
1239    pub fn remote_address(&self) -> SocketAddr {
1240        self.inner.remote_address()
1241    }
1242
1243    /// Extracts the ALPN protocol from the peer's handshake data.
1244    // Note, we could totally provide this method to be on a Connection as well.  But we'd
1245    // need to wrap Connection too.
1246    pub async fn alpn(&mut self) -> Result<Vec<u8>> {
1247        let data = self.handshake_data().await?;
1248        match data.downcast::<quinn::crypto::rustls::HandshakeData>() {
1249            Ok(data) => match data.protocol {
1250                Some(protocol) => Ok(protocol),
1251                None => bail!("no ALPN protocol available"),
1252            },
1253            Err(_) => bail!("unknown handshake type"),
1254        }
1255    }
1256}
1257
1258impl Future for Connecting {
1259    type Output = Result<Connection, ConnectionError>;
1260
1261    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1262        let this = self.project();
1263        match this.inner.poll(cx) {
1264            Poll::Pending => Poll::Pending,
1265            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1266            Poll::Ready(Ok(conn)) => {
1267                try_send_rtt_msg(&conn, this.ep);
1268                Poll::Ready(Ok(conn))
1269            }
1270        }
1271    }
1272}
1273
1274/// Extract the [`PublicKey`] from the peer's TLS certificate.
1275// TODO: make this a method now
1276pub fn get_remote_node_id(connection: &Connection) -> Result<PublicKey> {
1277    let data = connection.peer_identity();
1278    match data {
1279        None => bail!("no peer certificate found"),
1280        Some(data) => match data.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
1281            Ok(certs) => {
1282                if certs.len() != 1 {
1283                    bail!(
1284                        "expected a single peer certificate, but {} found",
1285                        certs.len()
1286                    );
1287                }
1288                let cert = tls::certificate::parse(&certs[0])?;
1289                Ok(cert.peer_id())
1290            }
1291            Err(_) => bail!("invalid peer certificate"),
1292        },
1293    }
1294}
1295
1296/// Try send a message to the rtt-actor.
1297///
1298/// If we can't notify the actor that will impact performance a little, but we can still
1299/// function.
1300fn try_send_rtt_msg(conn: &Connection, magic_ep: &Endpoint) {
1301    // If we can't notify the rtt-actor that's not great but not critical.
1302    let Ok(peer_id) = get_remote_node_id(conn) else {
1303        warn!(?conn, "failed to get remote node id");
1304        return;
1305    };
1306    let Ok(conn_type_changes) = magic_ep.conn_type_stream(peer_id) else {
1307        warn!(?conn, "failed to create conn_type_stream");
1308        return;
1309    };
1310    let rtt_msg = RttMessage::NewConnection {
1311        connection: conn.weak_handle(),
1312        conn_type_changes,
1313        node_id: peer_id,
1314    };
1315    if let Err(err) = magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) {
1316        warn!(?conn, "rtt-actor not reachable: {err:#}");
1317    }
1318}
1319
1320/// Read a proxy url from the environment, in this order
1321///
1322/// - `HTTP_PROXY`
1323/// - `http_proxy`
1324/// - `HTTPS_PROXY`
1325/// - `https_proxy`
1326fn proxy_url_from_env() -> Option<Url> {
1327    if let Some(url) = std::env::var("HTTP_PROXY")
1328        .ok()
1329        .and_then(|s| s.parse::<Url>().ok())
1330    {
1331        if is_cgi() {
1332            warn!("HTTP_PROXY environment variable ignored in CGI");
1333        } else {
1334            return Some(url);
1335        }
1336    }
1337    if let Some(url) = std::env::var("http_proxy")
1338        .ok()
1339        .and_then(|s| s.parse::<Url>().ok())
1340    {
1341        return Some(url);
1342    }
1343    if let Some(url) = std::env::var("HTTPS_PROXY")
1344        .ok()
1345        .and_then(|s| s.parse::<Url>().ok())
1346    {
1347        return Some(url);
1348    }
1349    if let Some(url) = std::env::var("https_proxy")
1350        .ok()
1351        .and_then(|s| s.parse::<Url>().ok())
1352    {
1353        return Some(url);
1354    }
1355
1356    None
1357}
1358
1359/// Returns the default relay mode.
1360///
1361/// If the `IROH_FORCE_STAGING_RELAYS` environment variable is non empty, it will return `RelayMode::Staging`.
1362/// Otherwise, it will return `RelayMode::Default`.
1363pub fn default_relay_mode() -> RelayMode {
1364    // Use staging in testing
1365    match force_staging_infra() {
1366        true => RelayMode::Staging,
1367        false => RelayMode::Default,
1368    }
1369}
1370
1371/// Check if we are being executed in a CGI context.
1372///
1373/// If so, a malicious client can send the `Proxy:` header, and it will
1374/// be in the `HTTP_PROXY` env var. So we don't use it :)
1375fn is_cgi() -> bool {
1376    std::env::var_os("REQUEST_METHOD").is_some()
1377}
1378
1379// TODO: These tests could still be flaky, lets fix that:
1380// https://github.com/n0-computer/iroh/issues/1183
1381#[cfg(test)]
1382mod tests {
1383
1384    use std::time::Instant;
1385
1386    use iroh_test::CallOnDrop;
1387    use rand::SeedableRng;
1388    use tracing::{error_span, info, info_span, Instrument};
1389
1390    use super::*;
1391    use crate::test_utils::{run_relay_server, run_relay_server_with};
1392
1393    const TEST_ALPN: &[u8] = b"n0/iroh/test";
1394
1395    #[test]
1396    fn test_addr_info_debug() {
1397        let info = AddrInfo {
1398            relay_url: Some("https://relay.example.com".parse().unwrap()),
1399            direct_addresses: vec![SocketAddr::from(([1, 2, 3, 4], 1234))]
1400                .into_iter()
1401                .collect(),
1402        };
1403        assert_eq!(
1404            format!("{:?}", info),
1405            r#"AddrInfo { relay_url: Some(RelayUrl("https://relay.example.com./")), direct_addresses: {1.2.3.4:1234} }"#
1406        );
1407    }
1408
1409    #[tokio::test]
1410    async fn test_connect_self() {
1411        let _guard = iroh_test::logging::setup();
1412        let ep = Endpoint::builder()
1413            .alpns(vec![TEST_ALPN.to_vec()])
1414            .bind()
1415            .await
1416            .unwrap();
1417        let my_addr = ep.node_addr().await.unwrap();
1418        let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1419        assert!(res.is_err());
1420        let err = res.err().unwrap();
1421        assert!(err.to_string().starts_with("Connecting to ourself"));
1422
1423        let res = ep.add_node_addr(my_addr);
1424        assert!(res.is_err());
1425        let err = res.err().unwrap();
1426        assert!(err.to_string().starts_with("Adding our own address"));
1427    }
1428
1429    #[tokio::test]
1430    async fn endpoint_connect_close() {
1431        let _guard = iroh_test::logging::setup();
1432        let (relay_map, relay_url, _guard) = run_relay_server().await.unwrap();
1433        let server_secret_key = SecretKey::generate();
1434        let server_peer_id = server_secret_key.public();
1435
1436        let server = {
1437            let relay_map = relay_map.clone();
1438            tokio::spawn(
1439                async move {
1440                    let ep = Endpoint::builder()
1441                        .secret_key(server_secret_key)
1442                        .alpns(vec![TEST_ALPN.to_vec()])
1443                        .relay_mode(RelayMode::Custom(relay_map))
1444                        .insecure_skip_relay_cert_verify(true)
1445                        .bind()
1446                        .await
1447                        .unwrap();
1448                    info!("accepting connection");
1449                    let incoming = ep.accept().await.unwrap();
1450                    let conn = incoming.await.unwrap();
1451                    let mut stream = conn.accept_uni().await.unwrap();
1452                    let mut buf = [0u8; 5];
1453                    stream.read_exact(&mut buf).await.unwrap();
1454                    info!("Accepted 1 stream, received {buf:?}.  Closing now.");
1455                    // close the connection
1456                    conn.close(7u8.into(), b"bye");
1457
1458                    let res = conn.accept_uni().await;
1459                    assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
1460
1461                    let res = stream.read_to_end(10).await;
1462                    assert_eq!(
1463                        res.unwrap_err(),
1464                        quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1465                            quinn::ConnectionError::LocallyClosed
1466                        ))
1467                    );
1468                    info!("server test completed");
1469                }
1470                .instrument(info_span!("test-server")),
1471            )
1472        };
1473
1474        let client = tokio::spawn(
1475            async move {
1476                let ep = Endpoint::builder()
1477                    .alpns(vec![TEST_ALPN.to_vec()])
1478                    .relay_mode(RelayMode::Custom(relay_map))
1479                    .insecure_skip_relay_cert_verify(true)
1480                    .bind()
1481                    .await
1482                    .unwrap();
1483                info!("client connecting");
1484                let node_addr = NodeAddr::new(server_peer_id).with_relay_url(relay_url);
1485                let conn = ep.connect(node_addr, TEST_ALPN).await.unwrap();
1486                let mut stream = conn.open_uni().await.unwrap();
1487
1488                // First write is accepted by server.  We need this bit of synchronisation
1489                // because if the server closes after simply accepting the connection we can
1490                // not be sure our .open_uni() call would succeed as it may already receive
1491                // the error.
1492                stream.write_all(b"hello").await.unwrap();
1493
1494                info!("waiting for closed");
1495                // Remote now closes the connection, we should see an error sometime soon.
1496                let err = conn.closed().await;
1497                let expected_err =
1498                    quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose {
1499                        error_code: 7u8.into(),
1500                        reason: b"bye".to_vec().into(),
1501                    });
1502                assert_eq!(err, expected_err);
1503
1504                info!("opening new - expect it to fail");
1505                let res = conn.open_uni().await;
1506                assert_eq!(res.unwrap_err(), expected_err);
1507                info!("client test completed");
1508            }
1509            .instrument(info_span!("test-client")),
1510        );
1511
1512        let (server, client) = tokio::time::timeout(
1513            Duration::from_secs(30),
1514            futures_lite::future::zip(server, client),
1515        )
1516        .await
1517        .expect("timeout");
1518        server.unwrap();
1519        client.unwrap();
1520    }
1521
1522    /// Test that peers are properly restored
1523    #[tokio::test]
1524    async fn restore_peers() {
1525        let _guard = iroh_test::logging::setup();
1526
1527        let secret_key = SecretKey::generate();
1528
1529        /// Create an endpoint for the test.
1530        async fn new_endpoint(secret_key: SecretKey, nodes: Option<Vec<NodeAddr>>) -> Endpoint {
1531            let mut transport_config = quinn::TransportConfig::default();
1532            transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
1533
1534            let mut builder = Endpoint::builder()
1535                .secret_key(secret_key.clone())
1536                .transport_config(transport_config);
1537            if let Some(nodes) = nodes {
1538                builder = builder.known_nodes(nodes);
1539            }
1540            builder
1541                .alpns(vec![TEST_ALPN.to_vec()])
1542                .bind()
1543                .await
1544                .unwrap()
1545        }
1546
1547        // create the peer that will be added to the peer map
1548        let peer_id = SecretKey::generate().public();
1549        let direct_addr: SocketAddr =
1550            (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 8758u16).into();
1551        let node_addr = NodeAddr::new(peer_id).with_direct_addresses([direct_addr]);
1552
1553        info!("setting up first endpoint");
1554        // first time, create a magic endpoint without peers but a peers file and add addressing
1555        // information for a peer
1556        let endpoint = new_endpoint(secret_key.clone(), None).await;
1557        assert_eq!(endpoint.remote_info_iter().count(), 0);
1558        endpoint.add_node_addr(node_addr.clone()).unwrap();
1559
1560        // Grab the current addrs
1561        let node_addrs: Vec<NodeAddr> = endpoint.remote_info_iter().map(Into::into).collect();
1562        assert_eq!(node_addrs.len(), 1);
1563        assert_eq!(node_addrs[0], node_addr);
1564
1565        info!("closing endpoint");
1566        // close the endpoint and restart it
1567        endpoint.close(0u32.into(), b"done").await.unwrap();
1568
1569        info!("restarting endpoint");
1570        // now restart it and check the addressing info of the peer
1571        let endpoint = new_endpoint(secret_key, Some(node_addrs)).await;
1572        let RemoteInfo { mut addrs, .. } = endpoint.remote_info(peer_id).unwrap();
1573        let conn_addr = addrs.pop().unwrap().addr;
1574        assert_eq!(conn_addr, direct_addr);
1575    }
1576
1577    #[tokio::test]
1578    async fn endpoint_relay_connect_loop() {
1579        let _logging_guard = iroh_test::logging::setup();
1580        let start = Instant::now();
1581        let n_clients = 5;
1582        let n_chunks_per_client = 2;
1583        let chunk_size = 10;
1584        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1585        let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1586        let server_secret_key = SecretKey::generate_with_rng(&mut rng);
1587        let server_node_id = server_secret_key.public();
1588
1589        // The server accepts the connections of the clients sequentially.
1590        let server = {
1591            let relay_map = relay_map.clone();
1592            tokio::spawn(
1593                async move {
1594                    let ep = Endpoint::builder()
1595                        .insecure_skip_relay_cert_verify(true)
1596                        .secret_key(server_secret_key)
1597                        .alpns(vec![TEST_ALPN.to_vec()])
1598                        .relay_mode(RelayMode::Custom(relay_map))
1599                        .bind()
1600                        .await
1601                        .unwrap();
1602                    let eps = ep.bound_sockets();
1603                    info!(me = %ep.node_id().fmt_short(), ipv4=%eps.0, ipv6=?eps.1, "server bound");
1604                    for i in 0..n_clients {
1605                        let now = Instant::now();
1606                        println!("[server] round {}", i + 1);
1607                        let incoming = ep.accept().await.unwrap();
1608                        let conn = incoming.await.unwrap();
1609                        let peer_id = get_remote_node_id(&conn).unwrap();
1610                        info!(%i, peer = %peer_id.fmt_short(), "accepted connection");
1611                        let (mut send, mut recv) = conn.accept_bi().await.unwrap();
1612                        let mut buf = vec![0u8; chunk_size];
1613                        for _i in 0..n_chunks_per_client {
1614                            recv.read_exact(&mut buf).await.unwrap();
1615                            send.write_all(&buf).await.unwrap();
1616                        }
1617                        send.finish().unwrap();
1618                        send.stopped().await.unwrap();
1619                        recv.read_to_end(0).await.unwrap();
1620                        info!(%i, peer = %peer_id.fmt_short(), "finished");
1621                        println!("[server] round {} done in {:?}", i + 1, now.elapsed());
1622                    }
1623                }
1624                .instrument(error_span!("server")),
1625            )
1626        };
1627        let abort_handle = server.abort_handle();
1628        let _server_guard = CallOnDrop::new(move || {
1629            abort_handle.abort();
1630        });
1631
1632        for i in 0..n_clients {
1633            let now = Instant::now();
1634            println!("[client] round {}", i + 1);
1635            let relay_map = relay_map.clone();
1636            let client_secret_key = SecretKey::generate_with_rng(&mut rng);
1637            let relay_url = relay_url.clone();
1638            async {
1639                info!("client binding");
1640                let ep = Endpoint::builder()
1641                    .alpns(vec![TEST_ALPN.to_vec()])
1642                    .insecure_skip_relay_cert_verify(true)
1643                    .relay_mode(RelayMode::Custom(relay_map))
1644                    .secret_key(client_secret_key)
1645                    .bind()
1646                    .await
1647                    .unwrap();
1648                let eps = ep.bound_sockets();
1649                info!(me = %ep.node_id().fmt_short(), ipv4=%eps.0, ipv6=?eps.1, "client bound");
1650                let node_addr = NodeAddr::new(server_node_id).with_relay_url(relay_url);
1651                info!(to = ?node_addr, "client connecting");
1652                let conn = ep.connect(node_addr, TEST_ALPN).await.unwrap();
1653                info!("client connected");
1654                let (mut send, mut recv) = conn.open_bi().await.unwrap();
1655
1656                for i in 0..n_chunks_per_client {
1657                    let mut buf = vec![i; chunk_size];
1658                    send.write_all(&buf).await.unwrap();
1659                    recv.read_exact(&mut buf).await.unwrap();
1660                    assert_eq!(buf, vec![i; chunk_size]);
1661                }
1662                send.finish().unwrap();
1663                send.stopped().await.unwrap();
1664                recv.read_to_end(0).await.unwrap();
1665                info!("client finished");
1666                ep.close(0u32.into(), &[]).await.unwrap();
1667                info!("client closed");
1668            }
1669            .instrument(error_span!("client", %i))
1670            .await;
1671            println!("[client] round {} done in {:?}", i + 1, now.elapsed());
1672        }
1673
1674        server.await.unwrap();
1675
1676        // We appear to have seen this being very slow at times.  So ensure we fail if this
1677        // test is too slow.  We're only making two connections transferring very little
1678        // data, this really shouldn't take long.
1679        if start.elapsed() > Duration::from_secs(15) {
1680            panic!("Test too slow, something went wrong");
1681        }
1682    }
1683
1684    #[tokio::test]
1685    async fn endpoint_bidi_send_recv() {
1686        let _logging_guard = iroh_test::logging::setup();
1687        let ep1 = Endpoint::builder()
1688            .alpns(vec![TEST_ALPN.to_vec()])
1689            .relay_mode(RelayMode::Disabled)
1690            .bind()
1691            .await
1692            .unwrap();
1693        let ep2 = Endpoint::builder()
1694            .alpns(vec![TEST_ALPN.to_vec()])
1695            .relay_mode(RelayMode::Disabled)
1696            .bind()
1697            .await
1698            .unwrap();
1699        let ep1_nodeaddr = ep1.node_addr().await.unwrap();
1700        let ep2_nodeaddr = ep2.node_addr().await.unwrap();
1701        ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap();
1702        ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap();
1703        let ep1_nodeid = ep1.node_id();
1704        let ep2_nodeid = ep2.node_id();
1705        eprintln!("node id 1 {ep1_nodeid}");
1706        eprintln!("node id 2 {ep2_nodeid}");
1707
1708        async fn connect_hello(ep: Endpoint, dst: NodeAddr) {
1709            let conn = ep.connect(dst, TEST_ALPN).await.unwrap();
1710            let (mut send, mut recv) = conn.open_bi().await.unwrap();
1711            info!("sending hello");
1712            send.write_all(b"hello").await.unwrap();
1713            send.finish().unwrap();
1714            info!("receiving world");
1715            let m = recv.read_to_end(100).await.unwrap();
1716            assert_eq!(m, b"world");
1717            conn.close(1u8.into(), b"done");
1718        }
1719
1720        async fn accept_world(ep: Endpoint, src: NodeId) {
1721            let incoming = ep.accept().await.unwrap();
1722            let mut iconn = incoming.accept().unwrap();
1723            let alpn = iconn.alpn().await.unwrap();
1724            let conn = iconn.await.unwrap();
1725            let node_id = get_remote_node_id(&conn).unwrap();
1726            assert_eq!(node_id, src);
1727            assert_eq!(alpn, TEST_ALPN);
1728            let (mut send, mut recv) = conn.accept_bi().await.unwrap();
1729            info!("receiving hello");
1730            let m = recv.read_to_end(100).await.unwrap();
1731            assert_eq!(m, b"hello");
1732            info!("sending hello");
1733            send.write_all(b"world").await.unwrap();
1734            send.finish().unwrap();
1735            match conn.closed().await {
1736                ConnectionError::ApplicationClosed(closed) => {
1737                    assert_eq!(closed.error_code, 1u8.into());
1738                }
1739                _ => panic!("wrong close error"),
1740            }
1741        }
1742
1743        let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_nodeid).instrument(info_span!(
1744            "p1_accept",
1745            ep1 = %ep1.node_id().fmt_short(),
1746            dst = %ep2_nodeid.fmt_short(),
1747        )));
1748        let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_nodeid).instrument(info_span!(
1749            "p2_accept",
1750            ep2 = %ep2.node_id().fmt_short(),
1751            dst = %ep1_nodeid.fmt_short(),
1752        )));
1753        let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_nodeaddr).instrument(
1754            info_span!(
1755                "p1_connect",
1756                ep1 = %ep1.node_id().fmt_short(),
1757                dst = %ep2_nodeid.fmt_short(),
1758            ),
1759        ));
1760        let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_nodeaddr).instrument(
1761            info_span!(
1762                "p2_connect",
1763                ep2 = %ep2.node_id().fmt_short(),
1764                dst = %ep1_nodeid.fmt_short(),
1765            ),
1766        ));
1767
1768        p1_accept.await.unwrap();
1769        p2_accept.await.unwrap();
1770        p1_connect.await.unwrap();
1771        p2_connect.await.unwrap();
1772    }
1773
1774    #[tokio::test]
1775    async fn endpoint_conn_type_stream() {
1776        const TIMEOUT: Duration = std::time::Duration::from_secs(15);
1777        let _logging_guard = iroh_test::logging::setup();
1778        let (relay_map, _relay_url, _relay_guard) = run_relay_server().await.unwrap();
1779        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1780        let ep1_secret_key = SecretKey::generate_with_rng(&mut rng);
1781        let ep2_secret_key = SecretKey::generate_with_rng(&mut rng);
1782        let ep1 = Endpoint::builder()
1783            .secret_key(ep1_secret_key)
1784            .insecure_skip_relay_cert_verify(true)
1785            .alpns(vec![TEST_ALPN.to_vec()])
1786            .relay_mode(RelayMode::Custom(relay_map.clone()))
1787            .bind()
1788            .await
1789            .unwrap();
1790        let ep2 = Endpoint::builder()
1791            .secret_key(ep2_secret_key)
1792            .insecure_skip_relay_cert_verify(true)
1793            .alpns(vec![TEST_ALPN.to_vec()])
1794            .relay_mode(RelayMode::Custom(relay_map))
1795            .bind()
1796            .await
1797            .unwrap();
1798
1799        async fn handle_direct_conn(ep: &Endpoint, node_id: PublicKey) -> Result<()> {
1800            let mut stream = ep.conn_type_stream(node_id)?;
1801            let src = ep.node_id().fmt_short();
1802            let dst = node_id.fmt_short();
1803            while let Some(conn_type) = stream.next().await {
1804                tracing::info!(me = %src, dst = %dst, conn_type = ?conn_type);
1805                if matches!(conn_type, ConnectionType::Direct(_)) {
1806                    return Ok(());
1807                }
1808            }
1809            anyhow::bail!("conn_type stream ended before `ConnectionType::Direct`");
1810        }
1811
1812        async fn accept(ep: &Endpoint) -> NodeId {
1813            let incoming = ep.accept().await.unwrap();
1814            let conn = incoming.await.unwrap();
1815            let node_id = get_remote_node_id(&conn).unwrap();
1816            tracing::info!(node_id=%node_id.fmt_short(), "accepted connection");
1817            node_id
1818        }
1819
1820        let ep1_nodeid = ep1.node_id();
1821        let ep2_nodeid = ep2.node_id();
1822
1823        let ep1_nodeaddr = ep1.node_addr().await.unwrap();
1824        tracing::info!(
1825            "node id 1 {ep1_nodeid}, relay URL {:?}",
1826            ep1_nodeaddr.relay_url()
1827        );
1828        tracing::info!("node id 2 {ep2_nodeid}");
1829
1830        let ep1_side = async move {
1831            accept(&ep1).await;
1832            handle_direct_conn(&ep1, ep2_nodeid).await
1833        };
1834
1835        let ep2_side = async move {
1836            ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap();
1837            handle_direct_conn(&ep2, ep1_nodeid).await
1838        };
1839
1840        let res_ep1 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep1_side));
1841
1842        let ep1_abort_handle = res_ep1.abort_handle();
1843        let _ep1_guard = CallOnDrop::new(move || {
1844            ep1_abort_handle.abort();
1845        });
1846
1847        let res_ep2 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep2_side));
1848        let ep2_abort_handle = res_ep2.abort_handle();
1849        let _ep2_guard = CallOnDrop::new(move || {
1850            ep2_abort_handle.abort();
1851        });
1852
1853        let (r1, r2) = tokio::try_join!(res_ep1, res_ep2).unwrap();
1854        r1.expect("ep1 timeout").unwrap();
1855        r2.expect("ep2 timeout").unwrap();
1856    }
1857
1858    #[tokio::test]
1859    async fn test_direct_addresses_no_stun_relay() {
1860        let _guard = iroh_test::logging::setup();
1861        let (relay_map, _, _guard) = run_relay_server_with(None).await.unwrap();
1862
1863        let ep = Endpoint::builder()
1864            .alpns(vec![TEST_ALPN.to_vec()])
1865            .relay_mode(RelayMode::Custom(relay_map))
1866            .insecure_skip_relay_cert_verify(true)
1867            .bind()
1868            .await
1869            .unwrap();
1870
1871        tokio::time::timeout(Duration::from_secs(10), ep.direct_addresses().next())
1872            .await
1873            .unwrap()
1874            .unwrap();
1875    }
1876}