Skip to main content

iroh/
endpoint.rs

1//! The [`Endpoint`] allows establishing connections to other iroh endpoints.
2//!
3//! The [`Endpoint`] is the main API interface to manage a local iroh endpoint.  It allows
4//! connecting to and accepting connections from other endpoints.  See the [module docs] for
5//! more details on how iroh connections work.
6//!
7//! The main items in this module are:
8//!
9//! - [`Endpoint`] to establish iroh connections with other endpoints.
10//! - [`Builder`] to create an [`Endpoint`].
11//!
12//! [module docs]: crate
13
14#[cfg(not(wasm_browser))]
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
19use iroh_relay::{RelayConfig, RelayMap};
20#[cfg(not(wasm_browser))]
21use n0_error::bail;
22use n0_error::{e, ensure, stack_error};
23use n0_watcher::Watcher;
24#[cfg(not(wasm_browser))]
25use netdev::ipnet::{Ipv4Net, Ipv6Net};
26use tracing::{debug, instrument, trace, warn};
27use url::Url;
28
29use self::hooks::EndpointHooksList;
30pub use super::socket::{
31    BindError, DirectAddr, DirectAddrType, PathInfo,
32    remote_map::{PathInfoList, RemoteInfo, Source, TransportAddrInfo, TransportAddrUsage},
33};
34#[cfg(wasm_browser)]
35use crate::address_lookup::PkarrResolver;
36#[cfg(not(wasm_browser))]
37use crate::dns::DnsResolver;
38use crate::{
39    NetReport,
40    address_lookup::{
41        ConcurrentAddressLookup, DynIntoAddressLookup, Error as AddressLookupError,
42        IntoAddressLookup, UserData,
43    },
44    endpoint::presets::Preset,
45    metrics::EndpointMetrics,
46    socket::{self, Handle, RemoteStateActorStoppedError, mapped_addrs::MappedAddr},
47    tls::{self, DEFAULT_MAX_TLS_TICKETS},
48};
49
50#[cfg(not(wasm_browser))]
51mod bind;
52mod connection;
53pub(crate) mod hooks;
54pub mod presets;
55pub(crate) mod quic;
56
57#[cfg(not(wasm_browser))]
58pub use bind::{BindOpts, InvalidSocketAddr, ToSocketAddr};
59pub use hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks};
60
61#[cfg(feature = "qlog")]
62pub use self::quic::{QlogConfig, QlogFactory, QlogFileFactory};
63pub use self::{
64    connection::{
65        Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
66        ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
67        IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
68        RemoteEndpointIdError, RetryError, ZeroRttStatus,
69    },
70    quic::{
71        AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
72        ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
73        ControllerMetrics, CryptoError, Dir, ExportKeyingMaterialError, FrameStats, FrameType,
74        HandshakeTokenKey, HeaderKey, IdleTimeout, Keys, MtuDiscoveryConfig, OpenBi, OpenUni,
75        PacketKey, PathId, PathStats, QuicConnectError, QuicTransportConfig,
76        QuicTransportConfigBuilder, ReadDatagram, ReadError, ReadExactError, ReadToEndError,
77        RecvStream, ResetError, RttEstimator, SendDatagram, SendDatagramError, SendStream,
78        ServerConfig, ServerConfigBuilder, Side, StoppedError, StreamId, TimeSource, TokenLog,
79        TokenReuseError, TransportError, TransportErrorCode, TransportParameters, UdpStats,
80        UnorderedRecvStream, UnsupportedVersion, ValidationTokenConfig, VarInt,
81        VarIntBoundsExceeded, WriteError, Written,
82    },
83};
84#[cfg(not(wasm_browser))]
85use crate::socket::transports::IpConfig;
86use crate::socket::transports::TransportConfig;
87
88/// Builder for [`Endpoint`].
89///
90/// By default the endpoint will generate a new random [`SecretKey`], which will result in a
91/// new [`EndpointId`].
92///
93/// To create the [`Endpoint`] call [`Builder::bind`].
94#[derive(Debug)]
95pub struct Builder {
96    secret_key: Option<SecretKey>,
97    alpn_protocols: Vec<Vec<u8>>,
98    transport_config: QuicTransportConfig,
99    keylog: bool,
100    address_lookup: Vec<Box<dyn DynIntoAddressLookup>>,
101    address_lookup_user_data: Option<UserData>,
102    proxy_url: Option<Url>,
103    #[cfg(not(wasm_browser))]
104    dns_resolver: Option<DnsResolver>,
105    #[cfg(any(test, feature = "test-utils"))]
106    insecure_skip_relay_cert_verify: bool,
107    transports: Vec<TransportConfig>,
108    max_tls_tickets: usize,
109    hooks: EndpointHooksList,
110}
111
112impl From<RelayMode> for Option<TransportConfig> {
113    fn from(mode: RelayMode) -> Self {
114        match mode {
115            RelayMode::Disabled => None,
116            RelayMode::Default => Some(TransportConfig::Relay {
117                relay_map: mode.relay_map(),
118                is_user_defined: true,
119            }),
120            RelayMode::Staging => Some(TransportConfig::Relay {
121                relay_map: mode.relay_map(),
122                is_user_defined: true,
123            }),
124            RelayMode::Custom(relay_map) => Some(TransportConfig::Relay {
125                relay_map,
126                is_user_defined: true,
127            }),
128        }
129    }
130}
131
132impl Builder {
133    // The ordering of public methods is reflected directly in the documentation.  This is
134    // roughly ordered by what is most commonly needed by users.
135
136    /// Creates a new [`Builder`] using the given [`Preset`].
137    ///
138    /// See [`presets`] for more.
139    pub fn new<P: Preset>(preset: P) -> Self {
140        Self::empty(RelayMode::Disabled).preset(preset)
141    }
142
143    /// Applies the given [`Preset`].
144    pub fn preset<P: Preset>(mut self, preset: P) -> Self {
145        self = preset.apply(self);
146        self
147    }
148
149    /// Creates an empty builder with no address lookup  services.
150    pub fn empty(relay_mode: RelayMode) -> Self {
151        let mut transports = vec![
152            #[cfg(not(wasm_browser))]
153            TransportConfig::default_ipv4(),
154            #[cfg(not(wasm_browser))]
155            TransportConfig::default_ipv6(),
156        ];
157        if let Some(relay) = relay_mode.into() {
158            transports.push(relay);
159        }
160        Self {
161            secret_key: Default::default(),
162            alpn_protocols: Default::default(),
163            transport_config: QuicTransportConfig::default(),
164            keylog: Default::default(),
165            address_lookup: Default::default(),
166            address_lookup_user_data: Default::default(),
167            proxy_url: None,
168            #[cfg(not(wasm_browser))]
169            dns_resolver: None,
170            #[cfg(any(test, feature = "test-utils"))]
171            insecure_skip_relay_cert_verify: false,
172            max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
173            transports,
174            hooks: Default::default(),
175        }
176    }
177
178    // # The final constructor that everyone needs.
179
180    /// Binds the endpoint.
181    pub async fn bind(self) -> Result<Endpoint, BindError> {
182        let mut rng = rand::rng();
183        let secret_key = self
184            .secret_key
185            .unwrap_or_else(move || SecretKey::generate(&mut rng));
186
187        let static_config = StaticConfig {
188            transport_config: self.transport_config.clone(),
189            tls_config: tls::TlsConfig::new(secret_key.clone(), self.max_tls_tickets),
190            keylog: self.keylog,
191        };
192        let server_config = static_config.create_server_config(self.alpn_protocols);
193
194        #[cfg(not(wasm_browser))]
195        let dns_resolver = self.dns_resolver.unwrap_or_default();
196
197        let metrics = EndpointMetrics::default();
198
199        let sock_opts = socket::Options {
200            transports: self.transports,
201            secret_key,
202            address_lookup_user_data: self.address_lookup_user_data,
203            proxy_url: self.proxy_url,
204            #[cfg(not(wasm_browser))]
205            dns_resolver,
206            server_config,
207            #[cfg(any(test, feature = "test-utils"))]
208            insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
209            metrics,
210            hooks: self.hooks,
211        };
212
213        let sock = socket::Socket::spawn(sock_opts).await?;
214        trace!("created socket");
215        debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");
216
217        let ep = Endpoint {
218            sock,
219            static_config: Arc::new(static_config),
220        };
221
222        // Add Address Lookup mechanisms
223        for create_service in self.address_lookup {
224            let service = create_service.into_address_lookup(&ep)?;
225            ep.address_lookup().add_boxed(service);
226        }
227
228        Ok(ep)
229    }
230
231    // # The very common methods everyone basically needs.
232
233    /// Binds an IP socket at the provided socket address.
234    ///
235    /// This is an advanced API to tightly control the sockets used by the endpoint. Most
236    /// uses do not need to explicitly bind sockets.
237    ///
238    /// # Warning
239    ///
240    /// - The builder always comes pre-configured with an IPv4 socket to be bound on the
241    ///   *unspecified* address: `0.0.0.0`. This is the equivalent of using `INADDR_ANY`
242    ///   special bind address and results in a socket listening on *all* interfaces
243    ///   available.
244    ///
245    /// - Likewise the builder always comes pre-configured with an IPv6 socket to be bound
246    ///   on the *unspecified* address: `[::]`. This bind is allowed to fail however.
247    ///
248    /// - Adding a bind address removes the pre-configured unspecified bind address for this
249    ///   address family. Use [`Self::bind_addr_with_opts`] to bind additional addresses without
250    ///   replacing the default bind address.
251    ///
252    /// - This should be called at most once for each address family: once for IPv4 and/or
253    ///   once for IPv6. Calling it multiple times for the same address family will result
254    ///   in undefined routing behaviour. To bind multiple sockets of the same address
255    ///   family, use [`Self::bind_addr_with_opts`].
256    ///
257    /// # Description
258    ///
259    /// Requests a socket to be bound on a specific address, with an implied netmask of
260    /// `/0`. This allows restricting binding to only one network interface for a given
261    /// address family.
262    ///
263    /// If the port specified is already in use, binding the endpoint will fail. Using
264    /// port `0` in the socket address assigns a random free port.
265    ///
266    /// # Example
267    ///
268    /// ```no_run
269    /// # #[tokio::main]
270    /// # async fn main() -> n0_error::Result<()> {
271    /// # use iroh::Endpoint;
272    /// let endpoint = Endpoint::builder()
273    ///     .clear_ip_transports()
274    ///     .bind_addr("127.0.0.1:0")?
275    ///     .bind_addr("[::1]:0")?
276    ///     .bind()
277    ///     .await?;
278    /// # Ok(()) }
279    /// ```
280    #[cfg(not(wasm_browser))]
281    pub fn bind_addr<A>(self, addr: A) -> Result<Self, InvalidSocketAddr>
282    where
283        A: ToSocketAddr,
284        <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
285    {
286        self.bind_addr_with_opts(addr, BindOpts::default())
287    }
288
289    /// Binds an IP socket at the provided socket address.
290    ///
291    /// This is an advanced API to tightly control the sockets used by the endpoint. Most
292    /// uses do not need to explicitly bind sockets.
293    ///
294    /// # Warning
295    ///
296    /// - The builder always comes pre-configured with an IPv4 socket to be bound on the
297    ///   *unspecified* address: `0.0.0.0`. This is the equivalent of using `INADDR_ANY`
298    ///   special bind address and results in a socket listening on *all* interfaces
299    ///   available.
300    ///
301    /// - Likewise the builder always comes pre-configured with an IPv6 socket to be bound
302    ///   on the *unspecified* address: `[::]`. This bind is allowed to fail however.
303    ///
304    /// # Description
305    ///
306    /// Requests a socket to be bound on a specific address. This allows restricting binding
307    /// to only one network interface for a given address family.
308    ///
309    /// [`BindOpts::set_prefix_len`] **should** be used to configure the netmask of the
310    /// network interface. This allows outgoing datagrams that start a new network flow to
311    /// be sent over the socket which is attached to the subnet of the destination
312    /// address. If multiple sockets are bound the standard routing-table semantics are
313    /// used: the socket attached to the subnet with the longest prefix matching the
314    /// destination is used. Practically this means the smallest subnets are at the top of
315    /// the routing table, and the first subnet containing the destination address is
316    /// chosen.
317    ///
318    /// If no socket is bound to a subnet that contains the destination address, the notion
319    /// of "default route" is used. At most one socket per address family may be marked as
320    /// the default route using [`BindOpts::set_is_default_route`], and this will be used
321    /// for destinations not contained by the subnets of the bound sockets. This network is
322    /// expected to have a default gateway configured. A socket with a prefix length of `/0`
323    /// will be set as a "default route" implicitly, unless [`BindOpts::set_is_default_route`]
324    /// is set to `false` explicitly.
325    ///
326    /// Be aware that using a subnet with a prefix length of `/0` will always contain all
327    /// destination addresses. It is valid to configure this, but no more than one such
328    /// socket should be bound or the routing will be non-deterministic.
329    ///
330    /// To use a subnet with a non-zero prefix length as the default route in addition to
331    /// being routed when its prefix matches, use [`BindOpts::set_is_default_route].
332    /// Subnets with a prefix length of zero are always marked as default routes.
333    ///
334    /// Finally note that most outgoing datagrams are part of an existing network flow. That
335    /// is, they are in response to an incoming datagram. In this case the outgoing datagram
336    /// will be sent over the same socket as the incoming datagram was received on, and the
337    /// routing with the prefix length and default route as described above does not apply.
338    ///
339    /// Using port `0` in the socket address assigns a random free port.
340    ///
341    /// If the port specified is already in use, binding the endpoint will fail, unless
342    /// [`BindOpts::set_is_required`] is set to `false`
343    ///
344    /// # Example
345    /// ```no_run
346    /// # #[tokio::main]
347    /// # async fn main() -> n0_error::Result<()> {
348    /// # use iroh::{Endpoint, endpoint::BindOpts};
349    /// let endpoint = Endpoint::builder()
350    ///     .clear_ip_transports()
351    ///     .bind_addr_with_opts("127.0.0.1:0", BindOpts::default().set_prefix_len(24))?
352    ///     .bind_addr_with_opts("[::1]:0", BindOpts::default().set_prefix_len(48))?
353    ///     .bind()
354    ///     .await?;
355    /// # Ok(()) }
356    /// ```
357    #[cfg(not(wasm_browser))]
358    pub fn bind_addr_with_opts<A>(
359        mut self,
360        addr: A,
361        opts: BindOpts,
362    ) -> Result<Self, InvalidSocketAddr>
363    where
364        A: ToSocketAddr,
365        <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
366    {
367        let addr = addr.to_socket_addr().map_err(Into::into)?;
368        match addr {
369            SocketAddr::V4(addr) => {
370                if self
371                    .transports
372                    .iter()
373                    .any(|t| t.is_ipv4_default() && t.is_user_defined())
374                {
375                    bail!(InvalidSocketAddr::DuplicateDefaultAddr);
376                }
377
378                let ip_net = Ipv4Net::new(*addr.ip(), opts.prefix_len())?;
379                self.transports.push(TransportConfig::Ip {
380                    config: IpConfig::V4 {
381                        ip_net,
382                        port: addr.port(),
383                        is_required: opts.is_required(),
384                        is_default: opts.is_default_route(),
385                    },
386                    is_user_defined: true,
387                });
388            }
389            SocketAddr::V6(addr) => {
390                if self
391                    .transports
392                    .iter()
393                    .any(|t| t.is_ipv6_default() && t.is_user_defined())
394                {
395                    bail!(InvalidSocketAddr::DuplicateDefaultAddr);
396                }
397
398                let ip_net = Ipv6Net::new(*addr.ip(), opts.prefix_len())?;
399                self.transports.push(TransportConfig::Ip {
400                    config: IpConfig::V6 {
401                        ip_net,
402                        scope_id: addr.scope_id(),
403                        port: addr.port(),
404                        is_required: opts.is_required(),
405                        is_default: opts.is_default_route(),
406                    },
407                    is_user_defined: true,
408                });
409            }
410        }
411        Ok(self)
412    }
413
414    /// Removes all IP based transports.
415    #[cfg(not(wasm_browser))]
416    pub fn clear_ip_transports(mut self) -> Self {
417        self.transports
418            .retain(|t| !matches!(t, TransportConfig::Ip { .. }));
419        self
420    }
421
422    /// Removes all relay based transports.
423    pub fn clear_relay_transports(mut self) -> Self {
424        self.transports
425            .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
426        self
427    }
428
429    /// Sets a secret key to authenticate with other peers.
430    ///
431    /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus
432    /// also its [`EndpointId`]
433    ///
434    /// If not set, a new secret key will be generated.
435    ///
436    /// [`PublicKey`]: iroh_base::PublicKey
437    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
438        self.secret_key = Some(secret_key);
439        self
440    }
441
442    /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections.
443    ///
444    /// Not setting this will still allow creating connections, but to accept incoming
445    /// connections at least one [ALPN] must be set.
446    ///
447    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
448    pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
449        self.alpn_protocols = alpn_protocols;
450        self
451    }
452
453    // # Methods for common customisation items.
454
455    /// Sets the relay servers to assist in establishing connectivity.
456    ///
457    /// Relay servers are used to establish initial connection with another iroh endpoint.
458    /// They also perform various functions related to hole punching, see the [crate docs]
459    /// for more details.
460    ///
461    /// By default the [number 0] relay servers are used, see [`RelayMode::Default`].
462    ///
463    /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one
464    /// configured relay endpoint.  If an invalid RelayMap is provided [`bind`]
465    /// will result in an error.
466    ///
467    /// [`bind`]: Builder::bind
468    /// [crate docs]: crate
469    /// [number 0]: https://n0.computer
470    pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
471        let transport: Option<_> = relay_mode.into();
472        match transport {
473            Some(transport) => {
474                if let Some(og) = self
475                    .transports
476                    .iter_mut()
477                    .find(|t| matches!(t, TransportConfig::Relay { .. }))
478                {
479                    *og = transport;
480                } else {
481                    self.transports.push(transport);
482                }
483            }
484            None => {
485                self.transports
486                    .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
487            }
488        }
489        self
490    }
491
492    /// Removes all Address Lookup services from the builder.
493    ///
494    /// If no Address Lookup is set, connecting to an endpoint without providing its
495    /// direct addresses or relay URLs will fail.
496    ///
497    /// See the documentation of the [`crate::address_lookup::AddressLookup`] trait for details.
498    pub fn clear_address_lookup(mut self) -> Self {
499        self.address_lookup.clear();
500        self
501    }
502
503    /// Adds an additional Address Lookup for this endpoint.
504    ///
505    /// Once the endpoint is created the provided [`IntoAddressLookup::into_address_lookup`] will be
506    /// called. This allows Address Lookup's to finalize their configuration by e.g. using
507    /// the secret key from the endpoint which can be needed to sign published information.
508    ///
509    /// This method can be called multiple times and all the Address Lookup's passed in
510    /// will be combined using an internal instance of the
511    /// [`crate::address_lookup::ConcurrentAddressLookup`]. To clear all Address Lookup's, use
512    /// [`Self::clear_address_lookup`].
513    ///
514    /// If no Address Lookup is set, connecting to an endpoint without providing its
515    /// direct addresses or relay URLs will fail.
516    ///
517    /// See the documentation of the [`crate::address_lookup::AddressLookup`] trait for details.
518    pub fn address_lookup(mut self, address_lookup: impl IntoAddressLookup) -> Self {
519        self.address_lookup.push(Box::new(address_lookup));
520        self
521    }
522
523    /// Sets the initial user-defined data to be published in Address Lookup's for this node.
524    ///
525    /// When using Address Lookup's, this string of [`UserData`] will be published together
526    /// with the endpoint's addresses and relay URL. When other endpoints discover this endpoint,
527    /// they retrieve the [`UserData`] in addition to the addressing info.
528    ///
529    /// Iroh itself does not interpret the user-defined data in any way, it is purely left
530    /// for applications to parse and use.
531    pub fn user_data_for_address_lookup(mut self, user_data: UserData) -> Self {
532        self.address_lookup_user_data = Some(user_data);
533        self
534    }
535
536    // # Methods for more specialist customisation.
537
538    /// Sets a custom [`QuicTransportConfig`] for this endpoint.
539    ///
540    /// The transport config contains parameters governing the QUIC state machine.
541    ///
542    /// If unset, the default config is used. Default values should be suitable for most
543    /// internet applications. Applications protocols which forbid remotely-initiated
544    /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to
545    /// zero.
546    ///
547    /// Please be aware that changing some settings may have adverse effects on establishing
548    /// and maintaining direct connections.
549    pub fn transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
550        self.transport_config = transport_config;
551        self
552    }
553
554    /// Optionally sets a custom DNS resolver to use for this endpoint.
555    ///
556    /// The DNS resolver is used to resolve relay hostnames, and endpoint addresses if
557    /// [`crate::address_lookup::DnsAddressLookup`] is configured.
558    ///
559    /// By default, a new DNS resolver is created which is configured to use the
560    /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`]
561    /// here to use a differently configured DNS resolver for this endpoint, or to share
562    /// a [`DnsResolver`] between multiple endpoints.
563    #[cfg(not(wasm_browser))]
564    pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
565        self.dns_resolver = Some(dns_resolver);
566        self
567    }
568
569    /// Sets an explicit proxy url to proxy all HTTP(S) traffic through.
570    pub fn proxy_url(mut self, url: Url) -> Self {
571        self.proxy_url.replace(url);
572        self
573    }
574
575    /// Sets the proxy url from the environment, in this order:
576    ///
577    /// - `HTTP_PROXY`
578    /// - `http_proxy`
579    /// - `HTTPS_PROXY`
580    /// - `https_proxy`
581    pub fn proxy_from_env(mut self) -> Self {
582        self.proxy_url = proxy_url_from_env();
583        self
584    }
585
586    /// Enables saving the TLS pre-master key for connections.
587    ///
588    /// This key should normally remain secret but can be useful to debug networking issues
589    /// by decrypting captured traffic.
590    ///
591    /// If *keylog* is `true` then setting the `SSLKEYLOGFILE` environment variable to a
592    /// filename will result in this file being used to log the TLS pre-master keys.
593    pub fn keylog(mut self, keylog: bool) -> Self {
594        self.keylog = keylog;
595        self
596    }
597
598    /// Skip verification of SSL certificates from relay servers
599    ///
600    /// May only be used in tests.
601    #[cfg(any(test, feature = "test-utils"))]
602    pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
603        self.insecure_skip_relay_cert_verify = skip_verify;
604        self
605    }
606
607    /// Set the maximum number of TLS tickets to cache.
608    ///
609    /// Set this to a larger value if you want to do 0rtt connections to a large
610    /// number of clients.
611    ///
612    /// The default is 256, taking about 150 KiB in memory.
613    pub fn max_tls_tickets(mut self, n: usize) -> Self {
614        self.max_tls_tickets = n;
615        self
616    }
617
618    /// Install hooks onto the endpoint.
619    ///
620    /// Endpoint hooks intercept the connection establishment process of an [`Endpoint`].
621    ///
622    /// You can install multiple [`EndpointHooks`] by calling this function multiple times.
623    /// Order matters: hooks are invoked in the order they were installed onto the endpoint
624    /// builder. Once a hook returns reject, further processing
625    /// is aborted and other hooks won't be invoked.
626    ///
627    /// See [`EndpointHooks`] for details on the possible interception points in the connection lifecycle.
628    pub fn hooks(mut self, hooks: impl EndpointHooks + 'static) -> Self {
629        self.hooks.push(hooks);
630        self
631    }
632}
633
634/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
635#[derive(Debug)]
636struct StaticConfig {
637    tls_config: tls::TlsConfig,
638    transport_config: QuicTransportConfig,
639    keylog: bool,
640}
641
642impl StaticConfig {
643    /// Create a [`ServerConfig`] with the specified ALPN protocols.
644    fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> quinn_proto::ServerConfig {
645        let quic_server_config = self
646            .tls_config
647            .make_server_config(alpn_protocols, self.keylog);
648        let mut inner = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
649        inner.transport_config(self.transport_config.to_inner_arc());
650        inner
651    }
652}
653
654/// Controls an iroh endpoint, establishing connections with other endpoints.
655///
656/// This is the main API interface to create connections to, and accept connections from
657/// other iroh endpoints.  The connections are peer-to-peer and encrypted, a Relay server is
658/// used to make the connections reliable.  See the [crate docs] for a more detailed
659/// overview of iroh.
660///
661/// It is recommended to only create a single instance per application.  This ensures all
662/// the connections made share the same peer-to-peer connections to other iroh endpoints,
663/// while still remaining independent connections.  This will result in more optimal network
664/// behaviour.
665///
666/// The endpoint is created using the [`Builder`], which can be created using
667/// [`Endpoint::builder`].
668///
669/// Once an endpoint exists, new connections are typically created using the
670/// [`Endpoint::connect`] and [`Endpoint::accept`] methods.  Once established, the
671/// [`Connection`] gives access to most [QUIC] features.  Individual streams to send data to
672/// the peer are created using the [`Connection::open_bi`], [`Connection::accept_bi`],
673/// [`Connection::open_uni`] and [`Connection::open_bi`] functions.
674///
675/// Note that due to the light-weight properties of streams a stream will only be accepted
676/// once the initiating peer has sent some data on it.
677///
678/// [QUIC]: https://quicwg.org
679#[derive(Clone, Debug)]
680pub struct Endpoint {
681    /// Handle to the socket/actor
682    pub(crate) sock: Handle,
683    /// Configuration structs for quinn, holds the transport config, certificate setup, secret key etc.
684    static_config: Arc<StaticConfig>,
685}
686
687#[allow(missing_docs)]
688#[stack_error(derive, add_meta, from_sources)]
689#[non_exhaustive]
690#[allow(private_interfaces)]
691pub enum ConnectWithOptsError {
692    #[error("Connecting to ourself is not supported")]
693    SelfConnect,
694    #[error("No addressing information available")]
695    NoAddress { source: AddressLookupError },
696    #[error("Unable to connect to remote")]
697    Quinn {
698        #[error(std_err)]
699        source: QuicConnectError,
700    },
701    #[error("Internal consistency error")]
702    InternalConsistencyError {
703        /// Private source type, cannot be created publicly.
704        source: RemoteStateActorStoppedError,
705    },
706    #[error("Connection was rejected locally")]
707    LocallyRejected,
708}
709
710#[allow(missing_docs)]
711#[stack_error(derive, add_meta, from_sources)]
712#[non_exhaustive]
713pub enum ConnectError {
714    #[error(transparent)]
715    Connect { source: ConnectWithOptsError },
716    #[error(transparent)]
717    Connecting { source: ConnectingError },
718    #[error(transparent)]
719    Connection {
720        #[error(std_err)]
721        source: ConnectionError,
722    },
723}
724
725impl Endpoint {
726    // The ordering of public methods is reflected directly in the documentation.  This is
727    // roughly ordered by what is most commonly needed by users, but grouped in similar
728    // items.
729
730    // # Methods relating to construction.
731
732    /// Returns the builder for an [`Endpoint`], with a production configuration.
733    ///
734    /// This uses the [`presets::N0`] as the configuration.
735    pub fn builder() -> Builder {
736        Builder::new(presets::N0)
737    }
738
739    /// Returns the builder for an [`Endpoint`], with an empty configuration.
740    ///
741    /// See [`Builder::empty`] for details.
742    pub fn empty_builder(relay_mode: RelayMode) -> Builder {
743        Builder::empty(relay_mode)
744    }
745
746    /// Constructs a default [`Endpoint`] and binds it immediately.
747    ///
748    /// Uses the [`presets::N0`] as configuration.
749    pub async fn bind() -> Result<Self, BindError> {
750        Self::builder().bind().await
751    }
752
753    /// Sets the list of accepted ALPN protocols.
754    ///
755    /// This will only affect new incoming connections.
756    /// Note that this *overrides* the current list of ALPNs.
757    pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) {
758        let server_config = self.static_config.create_server_config(alpns);
759        self.sock.endpoint().set_server_config(Some(server_config));
760    }
761
762    /// Adds the provided configuration to the [`RelayMap`].
763    ///
764    /// Replacing and returning any existing configuration for [`RelayUrl`].
765    pub async fn insert_relay(
766        &self,
767        relay: RelayUrl,
768        config: Arc<RelayConfig>,
769    ) -> Option<Arc<RelayConfig>> {
770        self.sock.insert_relay(relay, config).await
771    }
772
773    /// Removes the configuration from the [`RelayMap`] for the provided [`RelayUrl`].
774    ///
775    /// Returns any existing configuration.
776    pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayConfig>> {
777        self.sock.remove_relay(relay).await
778    }
779
780    // # Methods for establishing connectivity.
781
782    /// Connects to a remote [`Endpoint`].
783    ///
784    /// A value that can be converted into an [`EndpointAddr`] is required. This can be either an
785    /// [`EndpointAddr`] or an [`EndpointId`].
786    ///
787    /// The [`EndpointAddr`] must contain the [`EndpointId`] to dial and may also contain a [`RelayUrl`]
788    /// and direct addresses. If direct addresses are provided, they will be used to try and
789    /// establish a direct connection without involving a relay server.
790    ///
791    /// If neither a [`RelayUrl`] or direct addresses are configured in the [`EndpointAddr`] it
792    /// may still be possible a connection can be established.  This depends on which, if any,
793    /// [`crate::address_lookup::AddressLookup`]s were configured using [`Builder::address_lookup`].  The Address Lookup
794    /// service will also be used if the remote endpoint is not reachable on the provided direct
795    /// addresses and there is no [`RelayUrl`].
796    ///
797    /// If addresses or relay servers are neither provided nor can be discovered, the
798    /// connection attempt will fail with an error.
799    ///
800    /// The `alpn`, or application-level protocol identifier, is also required. The remote
801    /// endpoint must support this `alpn`, otherwise the connection attempt will fail with
802    /// an error.
803    ///
804    /// [`RelayUrl`]: crate::RelayUrl
805    pub async fn connect(
806        &self,
807        endpoint_addr: impl Into<EndpointAddr>,
808        alpn: &[u8],
809    ) -> Result<Connection, ConnectError> {
810        let endpoint_addr = endpoint_addr.into();
811        let remote = endpoint_addr.id;
812        let connecting = self
813            .connect_with_opts(endpoint_addr, alpn, Default::default())
814            .await?;
815        let conn = connecting.await?;
816
817        debug!(
818            me = %self.id().fmt_short(),
819            remote = %remote.fmt_short(),
820            alpn = %String::from_utf8_lossy(alpn),
821            "Connection established."
822        );
823        Ok(conn)
824    }
825
826    /// Starts a connection attempt with a remote [`Endpoint`].
827    ///
828    /// Like [`Endpoint::connect`] (see also its docs for general details), but allows for a more
829    /// advanced connection setup with more customization in two aspects:
830    /// 1. The returned future resolves to a [`Connecting`], which can be further processed into
831    ///    a [`Connection`] by awaiting, or alternatively allows connecting with 0-RTT via
832    ///    [`Connecting::into_0rtt`].
833    ///    **Note:** Please read the documentation for `into_0rtt` carefully to assess
834    ///    security concerns.
835    /// 2. The [`QuicTransportConfig`] for the connection can be modified via the provided
836    ///    [`ConnectOptions`].
837    ///    **Note:** Please be aware that changing transport config settings may have adverse effects on
838    ///    establishing and maintaining direct connections.  Carefully test settings you use and
839    ///    consider this currently as still rather experimental.
840    #[instrument(name = "connect", skip_all, fields(
841        me = %self.id().fmt_short(),
842        remote = tracing::field::Empty,
843        alpn = String::from_utf8_lossy(alpn).to_string(),
844    ))]
845    pub async fn connect_with_opts(
846        &self,
847        endpoint_addr: impl Into<EndpointAddr>,
848        alpn: &[u8],
849        options: ConnectOptions,
850    ) -> Result<Connecting, ConnectWithOptsError> {
851        let endpoint_addr: EndpointAddr = endpoint_addr.into();
852        if let BeforeConnectOutcome::Reject =
853            self.sock.hooks.before_connect(&endpoint_addr, alpn).await
854        {
855            return Err(e!(ConnectWithOptsError::LocallyRejected));
856        }
857        let endpoint_id = endpoint_addr.id;
858
859        tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short()));
860
861        // Connecting to ourselves is not supported.
862        ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect);
863
864        trace!(
865            dst_endpoint_id = %endpoint_id.fmt_short(),
866            relay_url = ?endpoint_addr.relay_urls().next().cloned(),
867            ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::<Vec<_>>(),
868            "connecting",
869        );
870
871        let mapped_addr = self.sock.resolve_remote(endpoint_addr).await??;
872
873        let transport_config = options
874            .transport_config
875            .map(|cfg| cfg.to_inner_arc())
876            .unwrap_or(self.static_config.transport_config.to_inner_arc());
877
878        // Start connecting via quinn. This will time out after 10 seconds if no reachable
879        // address is available.
880
881        let client_config = {
882            let mut alpn_protocols = vec![alpn.to_vec()];
883            alpn_protocols.extend(options.additional_alpns);
884            let quic_client_config = self
885                .static_config
886                .tls_config
887                .make_client_config(alpn_protocols, self.static_config.keylog);
888            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
889            client_config.transport_config(transport_config.clone());
890            client_config
891        };
892
893        let dest_addr = mapped_addr.private_socket_addr();
894        let server_name = &tls::name::encode(endpoint_id);
895        let connect = self
896            .sock
897            .endpoint()
898            .connect_with(client_config, dest_addr, server_name)?;
899
900        Ok(Connecting::new(connect, self.clone(), endpoint_id))
901    }
902
903    /// Accepts an incoming connection on the endpoint.
904    ///
905    /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted.
906    /// If multiple ALPNs have been configured the ALPN can be inspected before accepting
907    /// the connection using [`Connecting::alpn`].
908    ///
909    /// The returned future will yield `None` if the endpoint is closed by calling
910    /// [`Endpoint::close`].
911    pub fn accept(&self) -> Accept<'_> {
912        Accept {
913            inner: self.sock.endpoint().accept(),
914            ep: self.clone(),
915        }
916    }
917
918    // # Getter methods for properties of this Endpoint itself.
919
920    /// Returns the secret_key of this endpoint.
921    pub fn secret_key(&self) -> &SecretKey {
922        &self.static_config.tls_config.secret_key
923    }
924
925    /// Returns the endpoint id of this endpoint.
926    ///
927    /// This ID is the unique addressing information of this endpoint and other peers must know
928    /// it to be able to connect to this endpoint.
929    pub fn id(&self) -> EndpointId {
930        self.static_config.tls_config.secret_key.public()
931    }
932
933    /// Returns the current [`EndpointAddr`].
934    /// As long as the endpoint was able to bind to a network interface, some
935    /// local addresses will be available.
936    ///
937    /// The state of other fields depends on the state of networking and connectivity.
938    /// Use the [`Endpoint::online`] method to ensure that the endpoint is considered
939    /// "online" (has contacted a relay server) before calling this method, if you want
940    /// to ensure that the `EndpointAddr` will contain enough information to allow this endpoint
941    /// to be dialable by a remote endpoint over the internet.
942    ///
943    /// You can use the [`Endpoint::watch_addr`] method to get updates when the `EndpointAddr`
944    /// changes.
945    pub fn addr(&self) -> EndpointAddr {
946        self.watch_addr().get()
947    }
948
949    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
950    ///
951    /// The observed [`EndpointAddr`] will have the current [`RelayUrl`] and direct addresses.
952    ///
953    /// ```no_run
954    /// # async fn wrapper() -> n0_error::Result<()> {
955    /// use iroh::{Endpoint, Watcher};
956    ///
957    /// let endpoint = Endpoint::builder()
958    ///     .alpns(vec![b"my-alpn".to_vec()])
959    ///     .bind()
960    ///     .await?;
961    /// let endpoint_addr = endpoint.watch_addr().get();
962    /// # let _ = endpoint_addr;
963    /// # Ok(())
964    /// # }
965    /// ```
966    ///
967    /// The [`Endpoint::online`] method can be used as a convenience method to
968    /// understand if the endpoint has ever been considered "online". But after
969    /// that initial call to [`Endpoint::online`], to understand if your
970    /// endpoint is no longer able to be connected to by endpoints outside
971    /// of the private or local network, watch for changes in it's [`EndpointAddr`].
972    /// If there are no `addrs`in the [`EndpointAddr`], you may not be dialable by other endpoints
973    /// on the internet.
974    ///
975    ///
976    /// The `EndpointAddr` will change as:
977    /// - network conditions change
978    /// - the endpoint connects to a relay server
979    /// - the endpoint changes its preferred relay server
980    /// - more addresses are discovered for this endpoint
981    ///
982    /// [`RelayUrl`]: crate::RelayUrl
983    #[cfg(not(wasm_browser))]
984    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
985        let watch_addrs = self.sock.ip_addrs();
986        let watch_relay = self.sock.home_relay();
987        let endpoint_id = self.id();
988
989        watch_addrs.or(watch_relay).map(move |(addrs, relays)| {
990            EndpointAddr::from_parts(
991                endpoint_id,
992                relays
993                    .into_iter()
994                    .map(TransportAddr::Relay)
995                    .chain(addrs.into_iter().map(|x| TransportAddr::Ip(x.addr))),
996            )
997        })
998    }
999
1000    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
1001    ///
1002    /// When compiled to Wasm, this function returns a watcher that initializes
1003    /// with an [`EndpointAddr`] that only contains a relay URL, but no direct addresses,
1004    /// as there are no APIs for directly using sockets in browsers.
1005    #[cfg(wasm_browser)]
1006    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
1007        // In browsers, there will never be any direct addresses, so we wait
1008        // for the home relay instead. This makes the `EndpointAddr` have *some* way
1009        // of connecting to us.
1010        let watch_relay = self.sock.home_relay();
1011        let endpoint_id = self.id();
1012        watch_relay.map(move |mut relays| {
1013            EndpointAddr::from_parts(endpoint_id, relays.into_iter().map(TransportAddr::Relay))
1014        })
1015    }
1016
1017    /// A convenience method that waits for the endpoint to be considered "online".
1018    ///
1019    /// This currently means at least one relay server was connected,
1020    /// and at least one local IP address is available.
1021    /// Even if no relays are configured, this will still wait for a relay connection.
1022    ///
1023    /// Once this has been resolved the first time, this will always immediately resolve.
1024    ///
1025    /// This has no timeout, so if that is needed, you need to wrap it in a
1026    /// timeout. We recommend using a timeout close to
1027    /// [`crate::NET_REPORT_TIMEOUT`]s, so you can be sure that at least one
1028    /// [`crate::NetReport`] has been attempted.
1029    ///
1030    /// To understand if the endpoint has gone back "offline",
1031    /// you must use the [`Endpoint::watch_addr`] method, to
1032    /// get information on the current relay and direct address information.
1033    ///
1034    /// In the common case where the endpoint's configured relay servers are
1035    /// only accessible via a wide area network (WAN) connection, this method
1036    /// will await indefinitely when the endpoint has no WAN connection. If you're
1037    /// writing an app that's designed to work without a WAN connection, defer
1038    /// any calls to `online` as long as possible, or avoid calling `online`
1039    /// entirely.
1040    ///
1041    /// The online method does not interact with [`crate::address_lookup::AddressLookup`]
1042    /// services, which means that any Address Lookup that relies on a WAN
1043    /// connection is independent of the endpoint's online status.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```no run
1048    /// use iroh::Endpoint;
1049    ///
1050    /// #[tokio::main]
1051    /// async fn main() {
1052    /// // After this await returns, the endpoint is bound to a local socket.
1053    /// // It can be dialed, but almost certainly hasn't finished picking a
1054    /// // relay.
1055    /// let endpoint = Endpoint::bind().await;
1056    ///
1057    /// // After this await returns we have a connection to at least one relay
1058    /// // and holepunching should work as expected.
1059    /// endpoint.online().await;
1060    /// }
1061    /// ```
1062    pub async fn online(&self) {
1063        self.sock.home_relay().initialized().await;
1064    }
1065
1066    /// Returns a [`Watcher`] for any net-reports run from this [`Endpoint`].
1067    ///
1068    /// A `net-report` checks the network conditions of the [`Endpoint`], such as
1069    /// whether it is connected to the internet via Ipv4 and/or Ipv6, its NAT
1070    /// status, its latency to the relay servers, and its public addresses.
1071    ///
1072    /// The [`Endpoint`] continuously runs `net-reports` to monitor if network
1073    /// conditions have changed. This [`Watcher`] will return the latest result
1074    /// of the `net-report`.
1075    ///
1076    /// When issuing the first call to this method the first report might
1077    /// still be underway, in this case the [`Watcher`] might not be initialized
1078    /// with [`Some`] value yet.  Once the net-report has been successfully
1079    /// run, the [`Watcher`] will always return [`Some`] report immediately, which
1080    /// is the most recently run `net-report`.
1081    ///
1082    /// # Examples
1083    ///
1084    /// To get the first report use [`Watcher::initialized`]:
1085    /// ```no_run
1086    /// use iroh::{Endpoint, Watcher as _};
1087    ///
1088    /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
1089    /// # rt.block_on(async move {
1090    /// let ep = Endpoint::bind().await.unwrap();
1091    /// let _report = ep.net_report().initialized().await;
1092    /// # });
1093    /// ```
1094    #[doc(hidden)]
1095    pub fn net_report(&self) -> impl Watcher<Value = Option<NetReport>> + use<> {
1096        self.sock.net_report()
1097    }
1098
1099    /// Returns the local socket addresses on which the underlying sockets are bound.
1100    ///
1101    /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
1102    /// address if available.
1103    #[cfg(not(wasm_browser))]
1104    pub fn bound_sockets(&self) -> Vec<SocketAddr> {
1105        self.sock
1106            .local_addr()
1107            .into_iter()
1108            .filter_map(|addr| addr.into_socket_addr())
1109            .collect()
1110    }
1111
1112    // # Methods for less common getters.
1113    //
1114    // Partially they return things passed into the builder.
1115
1116    /// Returns the DNS resolver used in this [`Endpoint`].
1117    ///
1118    /// See [`Builder::dns_resolver`].
1119    #[cfg(not(wasm_browser))]
1120    pub fn dns_resolver(&self) -> &DnsResolver {
1121        self.sock.dns_resolver()
1122    }
1123
1124    /// Returns the Address Lookup service, if configured.
1125    ///
1126    /// See [`Builder::address_lookup`].
1127    pub fn address_lookup(&self) -> &ConcurrentAddressLookup {
1128        self.sock.address_lookup()
1129    }
1130
1131    /// Returns metrics collected for this endpoint.
1132    ///
1133    /// The endpoint internally collects various metrics about its operation.
1134    /// The returned [`EndpointMetrics`] struct contains all of these metrics.
1135    ///
1136    /// You can access individual metrics directly by using the public fields:
1137    /// ```rust
1138    /// # use std::collections::BTreeMap;
1139    /// # use iroh::endpoint::Endpoint;
1140    /// # async fn wrapper() -> n0_error::Result<()> {
1141    /// let endpoint = Endpoint::bind().await?;
1142    /// assert_eq!(endpoint.metrics().socket.recv_datagrams.get(), 0);
1143    /// # Ok(())
1144    /// # }
1145    /// ```
1146    ///
1147    /// [`EndpointMetrics`] implements [`MetricsGroupSet`], and each field
1148    /// implements [`MetricsGroup`]. These traits provide methods to iterate over
1149    /// the groups in the set, and over the individual metrics in each group, without having
1150    /// to access each field manually. With these methods, it is straightforward to collect
1151    /// all metrics into a map or push their values to a metrics collector.
1152    ///
1153    /// For example, the following snippet collects all metrics into a map:
1154    /// ```rust
1155    /// # use std::collections::BTreeMap;
1156    /// # use iroh_metrics::{Metric, MetricsGroup, MetricValue, MetricsGroupSet};
1157    /// # use iroh::endpoint::Endpoint;
1158    /// # async fn wrapper() -> n0_error::Result<()> {
1159    /// let endpoint = Endpoint::bind().await?;
1160    /// let metrics: BTreeMap<String, MetricValue> = endpoint
1161    ///     .metrics()
1162    ///     .iter()
1163    ///     .map(|(group, metric)| {
1164    ///         let name = [group, metric.name()].join(":");
1165    ///         (name, metric.value())
1166    ///     })
1167    ///     .collect();
1168    ///
1169    /// assert_eq!(metrics["socket:recv_datagrams"], MetricValue::Counter(0));
1170    /// # Ok(())
1171    /// # }
1172    /// ```
1173    ///
1174    /// The metrics can also be encoded into the OpenMetrics text format, as used by Prometheus.
1175    /// To do so, use the [`iroh_metrics::Registry`], add the endpoint metrics to the
1176    /// registry with [`Registry::register_all`], and encode the metrics to a string with
1177    /// [`encode_openmetrics_to_string`]:
1178    /// ```rust
1179    /// # use iroh_metrics::{Registry, MetricsSource};
1180    /// # use iroh::endpoint::Endpoint;
1181    /// # async fn wrapper() -> n0_error::Result<()> {
1182    /// let endpoint = Endpoint::bind().await?;
1183    /// let mut registry = Registry::default();
1184    /// registry.register_all(endpoint.metrics());
1185    /// let s = registry.encode_openmetrics_to_string()?;
1186    /// assert!(s.contains(r#"TYPE socket_recv_datagrams counter"#));
1187    /// assert!(s.contains(r#"socket_recv_datagrams_total 0"#));
1188    /// # Ok(())
1189    /// # }
1190    /// ```
1191    ///
1192    /// Through a registry, you can also add labels or prefixes to metrics with
1193    /// [`Registry::sub_registry_with_label`] or [`Registry::sub_registry_with_prefix`].
1194    /// Furthermore, [`iroh_metrics::service`] provides functions to easily start services
1195    /// to serve the metrics with a HTTP server, dump them to a file, or push them
1196    /// to a Prometheus gateway.
1197    ///
1198    /// For example, the following snippet launches an HTTP server that serves the metrics in the
1199    /// OpenMetrics text format:
1200    /// ```no_run
1201    /// # use std::{sync::{Arc, RwLock}, time::Duration};
1202    /// # use iroh_metrics::{Registry, MetricsSource};
1203    /// # use iroh::endpoint::Endpoint;
1204    /// # use n0_error::{StackResultExt, StdResultExt};
1205    /// # async fn wrapper() -> n0_error::Result<()> {
1206    /// // Create a registry, wrapped in a read-write lock so that we can register and serve
1207    /// // the metrics independently.
1208    /// let registry = Arc::new(RwLock::new(Registry::default()));
1209    /// // Spawn a task to serve the metrics on an OpenMetrics HTTP endpoint.
1210    /// let metrics_task = tokio::task::spawn({
1211    ///     let registry = registry.clone();
1212    ///     async move {
1213    ///         let addr = "0.0.0.0:9100".parse().unwrap();
1214    ///         iroh_metrics::service::start_metrics_server(addr, registry).await
1215    ///     }
1216    /// });
1217    ///
1218    /// // Spawn an endpoint and add the metrics to the registry.
1219    /// let endpoint = Endpoint::bind().await?;
1220    /// registry.write().unwrap().register_all(endpoint.metrics());
1221    ///
1222    /// // Wait for the metrics server to bind, then fetch the metrics via HTTP.
1223    /// tokio::time::sleep(Duration::from_millis(500));
1224    /// let res = reqwest::get("http://localhost:9100/metrics")
1225    ///     .await
1226    ///     .std_context("get")?
1227    ///     .text()
1228    ///     .await
1229    ///     .std_context("text")?;
1230    ///
1231    /// assert!(res.contains(r#"TYPE socket_recv_datagrams counter"#));
1232    /// assert!(res.contains(r#"socket_recv_datagrams_total 0"#));
1233    /// # metrics_task.abort();
1234    /// # Ok(())
1235    /// # }
1236    /// ```
1237    ///
1238    /// [`Registry`]: iroh_metrics::Registry
1239    /// [`Registry::register_all`]: iroh_metrics::Registry::register_all
1240    /// [`Registry::sub_registry_with_label`]: iroh_metrics::Registry::sub_registry_with_label
1241    /// [`Registry::sub_registry_with_prefix`]: iroh_metrics::Registry::sub_registry_with_prefix
1242    /// [`encode_openmetrics_to_string`]: iroh_metrics::MetricsSource::encode_openmetrics_to_string
1243    /// [`MetricsGroup`]: iroh_metrics::MetricsGroup
1244    /// [`MetricsGroupSet`]: iroh_metrics::MetricsGroupSet
1245    #[cfg(feature = "metrics")]
1246    pub fn metrics(&self) -> &EndpointMetrics {
1247        &self.sock.metrics
1248    }
1249
1250    /// Returns addressing information about a recently used remote endpoint.
1251    ///
1252    /// The returned [`RemoteInfo`] contains a list of all transport addresses for the remote
1253    /// that we know about. This is a snapshot in time and not a watcher.
1254    ///
1255    /// Returns `None` if the endpoint doesn't have information about the remote.
1256    /// When remote endpoints are no longer used, our endpoint will keep information around
1257    /// for a little while, and then drop it. Afterwards, this will return `None`.
1258    pub async fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
1259        self.sock.remote_info(endpoint_id).await
1260    }
1261
1262    // # Methods for less common state updates.
1263
1264    /// Notifies the system of potential network changes.
1265    ///
1266    /// On many systems iroh is able to detect network changes by itself, however
1267    /// some systems like android do not expose this functionality to native code.
1268    /// Android does however provide this functionality to Java code.  This
1269    /// function allows for notifying iroh of any potential network changes like
1270    /// this.
1271    ///
1272    /// Even when the network did not change, or iroh was already able to detect
1273    /// the network change itself, there is no harm in calling this function.
1274    pub async fn network_change(&self) {
1275        self.sock.network_change().await;
1276    }
1277
1278    // # Methods to update internal state.
1279
1280    /// Sets the initial user-defined data to be published in Address Lookups for this endpoint.
1281    ///
1282    /// If the user-defined data passed to this function is different to the previous one,
1283    /// the endpoint will republish its endpoint info to the configured Address Lookups.
1284    ///
1285    /// See also [`Builder::user_data_for_address_lookup`] for setting an initial value when
1286    /// building the endpoint.
1287    pub fn set_user_data_for_address_lookup(&self, user_data: Option<UserData>) {
1288        self.sock.set_user_data_for_address_lookup(user_data);
1289    }
1290
1291    // # Methods for terminating the endpoint.
1292
1293    /// Closes the QUIC endpoint and the socket.
1294    ///
1295    /// This will close any remaining open [`Connection`]s with an error code
1296    /// of `0` and an empty reason.  Though it is best practice to close those
1297    /// explicitly before with a custom error code and reason.
1298    ///
1299    /// It will then make a best effort to wait for all close notifications to be
1300    /// acknowledged by the peers, re-transmitting them if needed. This ensures the
1301    /// peers are aware of the closed connections instead of having to wait for a timeout
1302    /// on the connection. Once all connections are closed or timed out, the future
1303    /// finishes.
1304    ///
1305    /// The maximum time-out that this future will wait for depends on QUIC transport
1306    /// configurations of non-drained connections at the time of calling, and their current
1307    /// estimates of round trip time. With default parameters and a conservative estimate
1308    /// of round trip time, this call's future should take 3 seconds to resolve in cases of
1309    /// bad connectivity or failed connections. In the usual case, this call's future should
1310    /// return much more quickly.
1311    ///
1312    /// It is highly recommended you *do* wait for this close call to finish, if possible.
1313    /// Not doing so will make connections that were still open while closing the endpoint
1314    /// time out on the remote end. Thus remote ends will assume connections to have failed
1315    /// even if all application data was transmitted successfully.
1316    ///
1317    /// Note: Someone used to closing TCP sockets might wonder why it is necessary to wait
1318    /// for timeouts when closing QUIC endpoints, while they don't have to do this for TCP
1319    /// sockets. This is due to QUIC and its acknowledgments being implemented in user-land,
1320    /// while TCP sockets usually get closed and drained by the operating system in the
1321    /// kernel during the "Time-Wait" period of the TCP socket.
1322    ///
1323    /// Be aware however that the underlying UDP sockets are only closed once all clones of
1324    /// the the respective [`Endpoint`] are dropped.
1325    pub async fn close(&self) {
1326        self.sock.close().await;
1327    }
1328
1329    /// Check if this endpoint is still alive, or already closed.
1330    pub fn is_closed(&self) -> bool {
1331        self.sock.is_closed()
1332    }
1333
1334    /// Create a [`ServerConfigBuilder`] for this endpoint that includes the given alpns.
1335    ///
1336    /// Use the [`ServerConfigBuilder`] to customize the [`ServerConfig`] connection configuration
1337    /// for a connection accepted using the [`Incoming::accept_with`] method.
1338    pub fn create_server_config_builder(&self, alpns: Vec<Vec<u8>>) -> ServerConfigBuilder {
1339        let inner = self.static_config.create_server_config(alpns);
1340        ServerConfigBuilder::new(inner, self.static_config.transport_config.clone())
1341    }
1342
1343    // # Remaining private methods
1344
1345    #[cfg(test)]
1346    pub(crate) fn socket(&self) -> Handle {
1347        self.sock.clone()
1348    }
1349    #[cfg(test)]
1350    pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
1351        self.sock.endpoint()
1352    }
1353}
1354
1355/// Options for the [`Endpoint::connect_with_opts`] function.
1356#[derive(Default, Debug, Clone)]
1357pub struct ConnectOptions {
1358    transport_config: Option<QuicTransportConfig>,
1359    additional_alpns: Vec<Vec<u8>>,
1360}
1361
1362impl ConnectOptions {
1363    /// Initializes new connection options.
1364    ///
1365    /// By default, the connection will use the same options
1366    /// as [`Endpoint::connect`], e.g. a default [`QuicTransportConfig`].
1367    pub fn new() -> Self {
1368        Self::default()
1369    }
1370
1371    /// Sets the QUIC transport config options for this connection.
1372    pub fn with_transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
1373        self.transport_config = Some(transport_config);
1374        self
1375    }
1376
1377    /// Sets [ALPN] identifiers that should be signaled as supported on connection, *in
1378    /// addition* to the main [ALPN] identifier used in [`Endpoint::connect_with_opts`].
1379    ///
1380    /// This allows connecting to servers that may only support older versions of your
1381    /// protocol. In this case, you would add the older [ALPN] identifiers with this
1382    /// function.
1383    ///
1384    /// You'll know the final negotiated [ALPN] identifier once your connection was
1385    /// established using [`Connection::alpn`], or even slightly earlier in the
1386    /// handshake by using [`Connecting::alpn`].
1387    /// The negotiated [ALPN] identifier may be any of the [ALPN] identifiers in this
1388    /// list or the main [ALPN] used in [`Endpoint::connect_with_opts`].
1389    ///
1390    /// The [ALPN] identifier order on the connect side doesn't matter, since it's the
1391    /// accept side that determines the protocol.
1392    ///
1393    /// For setting the supported [ALPN] identifiers on the accept side, see the endpoint
1394    /// builder's [`Builder::alpns`] function.
1395    ///
1396    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
1397    pub fn with_additional_alpns(mut self, alpns: Vec<Vec<u8>>) -> Self {
1398        self.additional_alpns = alpns;
1399        self
1400    }
1401}
1402
1403/// Read a proxy url from the environment, in this order
1404///
1405/// - `HTTP_PROXY`
1406/// - `http_proxy`
1407/// - `HTTPS_PROXY`
1408/// - `https_proxy`
1409fn proxy_url_from_env() -> Option<Url> {
1410    if let Some(url) = std::env::var("HTTP_PROXY")
1411        .ok()
1412        .and_then(|s| s.parse::<Url>().ok())
1413    {
1414        if is_cgi() {
1415            warn!("HTTP_PROXY environment variable ignored in CGI");
1416        } else {
1417            return Some(url);
1418        }
1419    }
1420    if let Some(url) = std::env::var("http_proxy")
1421        .ok()
1422        .and_then(|s| s.parse::<Url>().ok())
1423    {
1424        return Some(url);
1425    }
1426    if let Some(url) = std::env::var("HTTPS_PROXY")
1427        .ok()
1428        .and_then(|s| s.parse::<Url>().ok())
1429    {
1430        return Some(url);
1431    }
1432    if let Some(url) = std::env::var("https_proxy")
1433        .ok()
1434        .and_then(|s| s.parse::<Url>().ok())
1435    {
1436        return Some(url);
1437    }
1438
1439    None
1440}
1441
1442/// Configuration of the relay servers for an [`Endpoint`].
1443#[derive(Debug, Clone, PartialEq, Eq)]
1444pub enum RelayMode {
1445    /// Disable relay servers completely.
1446    /// This means that neither listening nor dialing relays will be available.
1447    Disabled,
1448    /// Use the default relay map, with production relay servers from n0.
1449    ///
1450    /// See [`crate::defaults::prod`] for the severs used.
1451    Default,
1452    /// Use the staging relay servers from n0.
1453    Staging,
1454    /// Use a custom relay map.
1455    Custom(RelayMap),
1456}
1457
1458impl RelayMode {
1459    /// Returns the relay map for this mode.
1460    pub fn relay_map(&self) -> RelayMap {
1461        match self {
1462            RelayMode::Disabled => RelayMap::empty(),
1463            RelayMode::Default => crate::defaults::prod::default_relay_map(),
1464            RelayMode::Staging => crate::defaults::staging::default_relay_map(),
1465            RelayMode::Custom(relay_map) => relay_map.clone(),
1466        }
1467    }
1468
1469    /// Create a custom relay mode from a list of [`RelayUrl`]s.
1470    ///
1471    /// # Example
1472    ///
1473    /// ```
1474    /// # fn main() -> n0_error::Result<()> {
1475    /// # use iroh::RelayMode;
1476    /// RelayMode::custom([
1477    ///     "https://use1-1.relay.n0.iroh-canary.iroh.link.".parse()?,
1478    ///     "https://euw-1.relay.n0.iroh-canary.iroh.link.".parse()?,
1479    /// ]);
1480    /// # Ok(()) }
1481    /// ```
1482    pub fn custom(map: impl IntoIterator<Item = RelayUrl>) -> Self {
1483        let m = RelayMap::from_iter(map);
1484        Self::Custom(m)
1485    }
1486}
1487
1488/// Environment variable to force the use of staging relays.
1489pub const ENV_FORCE_STAGING_RELAYS: &str = "IROH_FORCE_STAGING_RELAYS";
1490
1491/// Returns `true` if the use of staging relays is forced.
1492pub fn force_staging_infra() -> bool {
1493    matches!(std::env::var(ENV_FORCE_STAGING_RELAYS), Ok(value) if !value.is_empty())
1494}
1495
1496/// Returns the default relay mode.
1497///
1498/// If the `IROH_FORCE_STAGING_RELAYS` environment variable is non empty, it will return `RelayMode::Staging`.
1499/// Otherwise, it will return `RelayMode::Default`.
1500pub fn default_relay_mode() -> RelayMode {
1501    // Use staging in testing
1502    match force_staging_infra() {
1503        true => RelayMode::Staging,
1504        false => RelayMode::Default,
1505    }
1506}
1507
1508/// Check if we are being executed in a CGI context.
1509///
1510/// If so, a malicious client can send the `Proxy:` header, and it will
1511/// be in the `HTTP_PROXY` env var. So we don't use it :)
1512fn is_cgi() -> bool {
1513    std::env::var_os("REQUEST_METHOD").is_some()
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518    use std::{
1519        collections::BTreeMap,
1520        io,
1521        net::{IpAddr, Ipv4Addr, Ipv6Addr},
1522        str::FromStr,
1523        sync::Arc,
1524        time::{Duration, Instant},
1525    };
1526
1527    use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
1528    use n0_error::{AnyError as Error, Result, StdResultExt};
1529    use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle, time};
1530    use n0_tracing_test::traced_test;
1531    use n0_watcher::Watcher;
1532    use rand::SeedableRng;
1533    use tokio::sync::oneshot;
1534    use tracing::{Instrument, debug_span, info, info_span, instrument};
1535
1536    use super::Endpoint;
1537    use crate::{
1538        RelayMap, RelayMode,
1539        address_lookup::memory::MemoryLookup,
1540        endpoint::{
1541            ApplicationClose, BindError, BindOpts, ConnectOptions, Connection, ConnectionError,
1542            PathInfo,
1543        },
1544        protocol::{AcceptError, ProtocolHandler, Router},
1545        test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
1546    };
1547
1548    const TEST_ALPN: &[u8] = b"n0/iroh/test";
1549
1550    #[tokio::test]
1551    #[traced_test]
1552    async fn test_connect_self() -> Result {
1553        let ep = Endpoint::empty_builder(RelayMode::Disabled)
1554            .alpns(vec![TEST_ALPN.to_vec()])
1555            .bind()
1556            .await
1557            .unwrap();
1558        let my_addr = ep.addr();
1559        let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
1560        assert!(res.is_err());
1561        let err = res.err().unwrap();
1562        assert!(err.to_string().starts_with("Connecting to ourself"));
1563
1564        Ok(())
1565    }
1566
1567    #[tokio::test]
1568    #[traced_test]
1569    async fn endpoint_connect_close() -> Result {
1570        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1571        let (relay_map, relay_url, _guard) = run_relay_server().await?;
1572        let server_secret_key = SecretKey::generate(&mut rng);
1573        let server_peer_id = server_secret_key.public();
1574
1575        let qlog = QlogFileGroup::from_env("endpoint_connect_close");
1576
1577        // Wait for the endpoint to be started to make sure it's up before clients try to connect
1578        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1579            .secret_key(server_secret_key)
1580            .transport_config(qlog.create("server")?)
1581            .alpns(vec![TEST_ALPN.to_vec()])
1582            .insecure_skip_relay_cert_verify(true)
1583            .bind()
1584            .await?;
1585        // Wait for the endpoint to be reachable via relay
1586        ep.online().await;
1587
1588        let server = tokio::spawn(
1589            async move {
1590                info!("accepting connection");
1591                let incoming = ep.accept().await.anyerr()?;
1592                let conn = incoming.await.anyerr()?;
1593                let mut stream = conn.accept_uni().await.anyerr()?;
1594                let mut buf = [0u8; 5];
1595                stream.read_exact(&mut buf).await.anyerr()?;
1596                info!("Accepted 1 stream, received {buf:?}.  Closing now.");
1597                // close the connection
1598                conn.close(7u8.into(), b"bye");
1599
1600                let res = conn.accept_uni().await;
1601                assert_eq!(res.unwrap_err(), ConnectionError::LocallyClosed);
1602
1603                let res = stream.read_to_end(10).await;
1604                assert_eq!(
1605                    res.unwrap_err(),
1606                    quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
1607                        ConnectionError::LocallyClosed
1608                    ))
1609                );
1610                info!("server test completed");
1611                Ok::<_, Error>(())
1612            }
1613            .instrument(info_span!("test-server")),
1614        );
1615
1616        let client = tokio::spawn(
1617            async move {
1618                let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1619                    .alpns(vec![TEST_ALPN.to_vec()])
1620                    .insecure_skip_relay_cert_verify(true)
1621                    .transport_config(qlog.create("client")?)
1622                    .bind()
1623                    .await?;
1624                info!("client connecting");
1625                let endpoint_addr = EndpointAddr::new(server_peer_id).with_relay_url(relay_url);
1626                let conn = ep.connect(endpoint_addr, TEST_ALPN).await?;
1627                let mut stream = conn.open_uni().await.anyerr()?;
1628
1629                // First write is accepted by server.  We need this bit of synchronisation
1630                // because if the server closes after simply accepting the connection we can
1631                // not be sure our .open_uni() call would succeed as it may already receive
1632                // the error.
1633                stream.write_all(b"hello").await.anyerr()?;
1634
1635                info!("waiting for closed");
1636                // Remote now closes the connection, we should see an error sometime soon.
1637                let err = conn.closed().await;
1638                let expected_err = ConnectionError::ApplicationClosed(ApplicationClose {
1639                    error_code: 7u8.into(),
1640                    reason: b"bye".to_vec().into(),
1641                });
1642                assert_eq!(err, expected_err);
1643
1644                info!("opening new - expect it to fail");
1645                let res = conn.open_uni().await;
1646                assert_eq!(res.unwrap_err(), expected_err);
1647                info!("client test completed");
1648                Ok::<_, Error>(())
1649            }
1650            .instrument(info_span!("test-client")),
1651        );
1652
1653        let (server, client) = tokio::time::timeout(
1654            Duration::from_secs(30),
1655            n0_future::future::zip(server, client),
1656        )
1657        .await
1658        .anyerr()?;
1659        server.anyerr()??;
1660        client.anyerr()??;
1661        Ok(())
1662    }
1663
1664    #[tokio::test]
1665    #[traced_test]
1666    async fn endpoint_relay_connect_loop() -> Result {
1667        let test_start = Instant::now();
1668        let n_clients = 5;
1669        let n_chunks_per_client = 2;
1670        let chunk_size = 100;
1671        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
1672        let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
1673        let server_secret_key = SecretKey::generate(&mut rng);
1674        let server_endpoint_id = server_secret_key.public();
1675
1676        // Make sure the server is bound before having clients connect to it:
1677        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1678            .insecure_skip_relay_cert_verify(true)
1679            .secret_key(server_secret_key)
1680            .alpns(vec![TEST_ALPN.to_vec()])
1681            .bind()
1682            .await?;
1683        // Also make sure the server has a working relay connection
1684        ep.online().await;
1685
1686        info!(time = ?test_start.elapsed(), "test setup done");
1687
1688        // The server accepts the connections of the clients sequentially.
1689        let server = tokio::spawn(
1690            async move {
1691                let eps = ep.bound_sockets();
1692
1693                info!(me = %ep.id().fmt_short(), eps = ?eps, "server listening on");
1694                for i in 0..n_clients {
1695                    tokio::time::timeout(Duration::from_secs(4), async {
1696                        let round_start = Instant::now();
1697                        info!("[server] round {i}");
1698                        let incoming = ep.accept().await.anyerr()?;
1699                        let conn = incoming.await.anyerr()?;
1700                        let endpoint_id = conn.remote_id();
1701                        info!(%i, peer = %endpoint_id.fmt_short(), "accepted connection");
1702                        let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1703                        let mut buf = vec![0u8; chunk_size];
1704                        for _i in 0..n_chunks_per_client {
1705                            recv.read_exact(&mut buf).await.anyerr()?;
1706                            send.write_all(&buf).await.anyerr()?;
1707                        }
1708                        info!(%i, peer = %endpoint_id.fmt_short(), "finishing");
1709                        send.finish().anyerr()?;
1710                        conn.closed().await; // we're the last to send data, so we wait for the other side to close
1711                        info!(%i, peer = %endpoint_id.fmt_short(), "finished");
1712                        info!("[server] round {i} done in {:?}", round_start.elapsed());
1713                        Ok::<_, Error>(())
1714                    })
1715                    .await
1716                    .std_context("timeout")??;
1717                }
1718                Ok::<_, Error>(())
1719            }
1720            .instrument(debug_span!("server")),
1721        );
1722
1723        let client = tokio::spawn(async move {
1724            for i in 0..n_clients {
1725                let round_start = Instant::now();
1726                info!("[client] round {i}");
1727                let client_secret_key = SecretKey::generate(&mut rng);
1728                tokio::time::timeout(
1729                    Duration::from_secs(4),
1730                    async {
1731                        info!("client binding");
1732                        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1733                            .alpns(vec![TEST_ALPN.to_vec()])
1734                            .insecure_skip_relay_cert_verify(true)
1735                            .secret_key(client_secret_key)
1736                            .bind()
1737                            .await?;
1738                        let eps = ep.bound_sockets();
1739
1740                        info!(me = %ep.id().fmt_short(), eps=?eps, "client bound");
1741                        let endpoint_addr =
1742                            EndpointAddr::new(server_endpoint_id).with_relay_url(relay_url.clone());
1743                        info!(to = ?endpoint_addr, "client connecting");
1744                        let conn = ep.connect(endpoint_addr, TEST_ALPN).await.anyerr()?;
1745                        info!("client connected");
1746                        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1747
1748                        for i in 0..n_chunks_per_client {
1749                            let mut buf = vec![i; chunk_size];
1750                            send.write_all(&buf).await.anyerr()?;
1751                            recv.read_exact(&mut buf).await.anyerr()?;
1752                            assert_eq!(buf, vec![i; chunk_size]);
1753                        }
1754                        // we're the last to receive data, so we close
1755                        conn.close(0u32.into(), b"bye!");
1756                        info!("client finished");
1757                        ep.close().await;
1758                        info!("client closed");
1759
1760                        Ok::<_, Error>(())
1761                    }
1762                    .instrument(debug_span!("client", %i)),
1763                )
1764                .await
1765                .std_context("timeout")??;
1766                info!("[client] round {i} done in {:?}", round_start.elapsed());
1767            }
1768            Ok::<_, Error>(())
1769        });
1770
1771        server.await.anyerr()??;
1772        client.await.anyerr()??;
1773        Ok(())
1774    }
1775
1776    #[tokio::test]
1777    #[traced_test]
1778    async fn endpoint_send_relay() -> Result {
1779        let (relay_map, _relay_url, _guard) = run_relay_server().await?;
1780        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1781            .insecure_skip_relay_cert_verify(true)
1782            .bind()
1783            .await?;
1784        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1785            .insecure_skip_relay_cert_verify(true)
1786            .alpns(vec![TEST_ALPN.to_vec()])
1787            .bind()
1788            .await?;
1789
1790        let task = tokio::spawn({
1791            let server = server.clone();
1792            async move {
1793                let Some(conn) = server.accept().await else {
1794                    n0_error::bail_any!("Expected an incoming connection");
1795                };
1796                let conn = conn.await.anyerr()?;
1797                let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
1798                let data = recv.read_to_end(1000).await.anyerr()?;
1799                send.write_all(&data).await.anyerr()?;
1800                send.finish().anyerr()?;
1801                conn.closed().await;
1802
1803                Ok::<_, Error>(())
1804            }
1805        });
1806
1807        let addr = server.addr();
1808        let conn = client.connect(addr, TEST_ALPN).await?;
1809        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
1810        send.write_all(b"Hello, world!").await.anyerr()?;
1811        send.finish().anyerr()?;
1812        let data = recv.read_to_end(1000).await.anyerr()?;
1813        conn.close(0u32.into(), b"bye!");
1814
1815        task.await.anyerr()??;
1816
1817        client.close().await;
1818        server.close().await;
1819
1820        assert_eq!(&data, b"Hello, world!");
1821
1822        Ok(())
1823    }
1824
1825    #[tokio::test]
1826    #[traced_test]
1827    async fn endpoint_two_direct_only() -> Result {
1828        // Connect two endpoints on the same network, without a relay server, without
1829        // Address Lookup.
1830        let ep1 = {
1831            let span = info_span!("server");
1832            let _guard = span.enter();
1833            Endpoint::builder()
1834                .alpns(vec![TEST_ALPN.to_vec()])
1835                .relay_mode(RelayMode::Disabled)
1836                .bind()
1837                .await?
1838        };
1839        let ep2 = {
1840            let span = info_span!("client");
1841            let _guard = span.enter();
1842            Endpoint::builder()
1843                .alpns(vec![TEST_ALPN.to_vec()])
1844                .relay_mode(RelayMode::Disabled)
1845                .bind()
1846                .await?
1847        };
1848        let ep1_nodeaddr = ep1.addr();
1849
1850        #[instrument(name = "client", skip_all)]
1851        async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<ConnectionError> {
1852            info!(me = %ep.id().fmt_short(), "client starting");
1853            let conn = ep.connect(dst, TEST_ALPN).await?;
1854            let mut send = conn.open_uni().await.anyerr()?;
1855            send.write_all(b"hello").await.anyerr()?;
1856            send.finish().anyerr()?;
1857            Ok(conn.closed().await)
1858        }
1859
1860        #[instrument(name = "server", skip_all)]
1861        async fn accept(ep: Endpoint, src: EndpointId) -> Result {
1862            info!(me = %ep.id().fmt_short(), "server starting");
1863            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1864            let node_id = conn.remote_id();
1865            assert_eq!(node_id, src);
1866            let mut recv = conn.accept_uni().await.anyerr()?;
1867            let msg = recv.read_to_end(100).await.anyerr()?;
1868            assert_eq!(msg, b"hello");
1869            // Dropping the connection closes it just fine.
1870            Ok(())
1871        }
1872
1873        let ep1_accept = tokio::spawn(accept(ep1.clone(), ep2.id()));
1874        let ep2_connect = tokio::spawn(connect(ep2.clone(), ep1_nodeaddr));
1875
1876        ep1_accept.await.anyerr()??;
1877        let conn_closed = dbg!(ep2_connect.await.anyerr()??);
1878        assert!(matches!(
1879            conn_closed,
1880            ConnectionError::ApplicationClosed(ApplicationClose { .. })
1881        ));
1882
1883        Ok(())
1884    }
1885
1886    #[tokio::test]
1887    #[traced_test]
1888    async fn endpoint_two_relay_only_becomes_direct() -> Result {
1889        // Connect two endpoints on the same network, via a relay server, without
1890        // Address Lookup.  Wait until there is a direct connection.
1891        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1892        let (node_addr_tx, node_addr_rx) = oneshot::channel();
1893        let qlog = Arc::new(QlogFileGroup::from_env("two_relay_only_becomes_direct"));
1894
1895        #[instrument(name = "client", skip_all)]
1896        async fn connect(
1897            relay_map: RelayMap,
1898            node_addr_rx: oneshot::Receiver<EndpointAddr>,
1899            qlog: Arc<QlogFileGroup>,
1900        ) -> Result<ConnectionError> {
1901            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1902            let secret = SecretKey::generate(&mut rng);
1903            let ep = Endpoint::builder()
1904                .secret_key(secret)
1905                .alpns(vec![TEST_ALPN.to_vec()])
1906                .insecure_skip_relay_cert_verify(true)
1907                .relay_mode(RelayMode::Custom(relay_map))
1908                .transport_config(qlog.create("client")?)
1909                .bind()
1910                .await?;
1911            info!(me = %ep.id().fmt_short(), "client starting");
1912            let dst = node_addr_rx.await.anyerr()?;
1913
1914            info!(me = %ep.id().fmt_short(), "client connecting");
1915            let conn = ep.connect(dst, TEST_ALPN).await?;
1916            let mut send = conn.open_uni().await.anyerr()?;
1917            send.write_all(b"hello").await.anyerr()?;
1918            let mut paths = conn.paths().stream();
1919            info!("Waiting for direct connection");
1920            while let Some(infos) = paths.next().await {
1921                info!(?infos, "new PathInfos");
1922                if infos.iter().any(|info| info.is_ip()) {
1923                    break;
1924                }
1925            }
1926            info!("Have direct connection");
1927            // Validate holepunch metrics.
1928            assert_eq!(ep.metrics().socket.num_conns_opened.get(), 1);
1929            assert_eq!(ep.metrics().socket.num_conns_direct.get(), 1);
1930
1931            send.write_all(b"close please").await.anyerr()?;
1932            send.finish().anyerr()?;
1933
1934            Ok(conn.closed().await)
1935        }
1936
1937        #[instrument(name = "server", skip_all)]
1938        async fn accept(
1939            relay_map: RelayMap,
1940            node_addr_tx: oneshot::Sender<EndpointAddr>,
1941            qlog: Arc<QlogFileGroup>,
1942        ) -> Result {
1943            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
1944            let secret = SecretKey::generate(&mut rng);
1945            let ep = Endpoint::builder()
1946                .secret_key(secret)
1947                .alpns(vec![TEST_ALPN.to_vec()])
1948                .insecure_skip_relay_cert_verify(true)
1949                .transport_config(qlog.create("server")?)
1950                .relay_mode(RelayMode::Custom(relay_map))
1951                .bind()
1952                .await?;
1953            ep.online().await;
1954            let mut node_addr = ep.addr();
1955            node_addr.addrs.retain(|addr| addr.is_relay());
1956            node_addr_tx.send(node_addr).unwrap();
1957
1958            info!(me = %ep.id().fmt_short(), "server starting");
1959            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
1960            // let node_id = conn.remote_node_id()?;
1961            // assert_eq!(node_id, src);
1962            let mut recv = conn.accept_uni().await.anyerr()?;
1963            let mut msg = [0u8; 5];
1964            recv.read_exact(&mut msg).await.anyerr()?;
1965            assert_eq!(&msg, b"hello");
1966            info!("received hello");
1967            let msg = recv.read_to_end(100).await.anyerr()?;
1968            assert_eq!(msg, b"close please");
1969            info!("received 'close please'");
1970            // Dropping the connection closes it just fine.
1971            Ok(())
1972        }
1973
1974        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx, qlog.clone()));
1975        let client_task = tokio::spawn(connect(relay_map, node_addr_rx, qlog));
1976
1977        server_task.await.anyerr()??;
1978        let conn_closed = dbg!(client_task.await.anyerr()??);
1979        assert!(matches!(
1980            conn_closed,
1981            ConnectionError::ApplicationClosed(ApplicationClose { .. })
1982        ));
1983
1984        Ok(())
1985    }
1986
1987    #[tokio::test]
1988    #[traced_test]
1989    async fn endpoint_two_relay_only_no_ip() -> Result {
1990        // Connect two endpoints on the same network, via a relay server, without
1991        // Address Lookup.
1992        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
1993        let (node_addr_tx, node_addr_rx) = oneshot::channel();
1994
1995        #[instrument(name = "client", skip_all)]
1996        async fn connect(
1997            relay_map: RelayMap,
1998            node_addr_rx: oneshot::Receiver<EndpointAddr>,
1999        ) -> Result<ConnectionError> {
2000            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
2001            let secret = SecretKey::generate(&mut rng);
2002            let ep = Endpoint::builder()
2003                .secret_key(secret)
2004                .alpns(vec![TEST_ALPN.to_vec()])
2005                .insecure_skip_relay_cert_verify(true)
2006                .relay_mode(RelayMode::Custom(relay_map))
2007                .clear_ip_transports() // disable direct
2008                .bind()
2009                .await?;
2010            info!(me = %ep.id().fmt_short(), "client starting");
2011            let dst = node_addr_rx.await.anyerr()?;
2012
2013            info!(me = %ep.id().fmt_short(), "client connecting");
2014            let conn = ep.connect(dst, TEST_ALPN).await?;
2015            let mut send = conn.open_uni().await.anyerr()?;
2016            send.write_all(b"hello").await.anyerr()?;
2017            let mut paths = conn.paths().stream();
2018            info!("Waiting for connection");
2019            'outer: while let Some(infos) = paths.next().await {
2020                info!(?infos, "new PathInfos");
2021                for info in infos {
2022                    if info.is_ip() {
2023                        panic!("should not happen: {:?}", info);
2024                    }
2025                    if info.is_relay() {
2026                        break 'outer;
2027                    }
2028                }
2029            }
2030            info!("Have relay connection");
2031            send.write_all(b"close please").await.anyerr()?;
2032            send.finish().anyerr()?;
2033            Ok(conn.closed().await)
2034        }
2035
2036        #[instrument(name = "server", skip_all)]
2037        async fn accept(
2038            relay_map: RelayMap,
2039            node_addr_tx: oneshot::Sender<EndpointAddr>,
2040        ) -> Result {
2041            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
2042            let secret = SecretKey::generate(&mut rng);
2043            let ep = Endpoint::builder()
2044                .secret_key(secret)
2045                .alpns(vec![TEST_ALPN.to_vec()])
2046                .insecure_skip_relay_cert_verify(true)
2047                .relay_mode(RelayMode::Custom(relay_map))
2048                .clear_ip_transports()
2049                .bind()
2050                .await?;
2051            ep.online().await;
2052            let node_addr = ep.addr();
2053            node_addr_tx.send(node_addr).unwrap();
2054
2055            info!(me = %ep.id().fmt_short(), "server starting");
2056            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
2057            // let node_id = conn.remote_node_id()?;
2058            // assert_eq!(node_id, src);
2059            let mut recv = conn.accept_uni().await.anyerr()?;
2060            let mut msg = [0u8; 5];
2061            recv.read_exact(&mut msg).await.anyerr()?;
2062            assert_eq!(&msg, b"hello");
2063            info!("received hello");
2064            let msg = recv.read_to_end(100).await.anyerr()?;
2065            assert_eq!(msg, b"close please");
2066            info!("received 'close please'");
2067            // Dropping the connection closes it just fine.
2068            Ok(())
2069        }
2070
2071        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
2072        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
2073
2074        server_task.await.anyerr()??;
2075        let conn_closed = dbg!(client_task.await.anyerr()??);
2076        assert!(matches!(
2077            conn_closed,
2078            ConnectionError::ApplicationClosed(ApplicationClose { .. })
2079        ));
2080
2081        Ok(())
2082    }
2083
2084    #[tokio::test]
2085    #[traced_test]
2086    async fn endpoint_two_direct_add_relay() -> Result {
2087        // Connect two endpoints on the same network, without relay server and without
2088        // Address Lookup.  Add a relay connection later.
2089        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
2090        let (node_addr_tx, node_addr_rx) = oneshot::channel();
2091
2092        #[instrument(name = "client", skip_all)]
2093        async fn connect(
2094            relay_map: RelayMap,
2095            node_addr_rx: oneshot::Receiver<EndpointAddr>,
2096        ) -> Result<()> {
2097            let secret = SecretKey::from([0u8; 32]);
2098            let ep = Endpoint::builder()
2099                .secret_key(secret)
2100                .alpns(vec![TEST_ALPN.to_vec()])
2101                .insecure_skip_relay_cert_verify(true)
2102                .relay_mode(RelayMode::Custom(relay_map))
2103                .bind()
2104                .await?;
2105            info!(me = %ep.id().fmt_short(), "client starting");
2106            let dst = node_addr_rx.await.anyerr()?;
2107
2108            info!(me = %ep.id().fmt_short(), "client connecting");
2109            let conn = ep.connect(dst, TEST_ALPN).await?;
2110            info!(me = %ep.id().fmt_short(), "client connected");
2111
2112            // We should be connected via IP, because it is faster than the relay server.
2113            // TODO: Maybe not panic if this is not true?
2114            let path_info = conn.paths().get();
2115            assert_eq!(path_info.len(), 1);
2116            assert!(path_info.iter().next().unwrap().is_ip());
2117
2118            let mut paths = conn.paths().stream();
2119            time::timeout(Duration::from_secs(5), async move {
2120                while let Some(infos) = paths.next().await {
2121                    info!(?infos, "new PathInfos");
2122                    if infos.iter().any(|info| info.is_relay()) {
2123                        info!("client has a relay path");
2124                        break;
2125                    }
2126                }
2127            })
2128            .await
2129            .anyerr()?;
2130
2131            // wait for the server to signal it has the relay connection
2132            let mut stream = conn.accept_uni().await.anyerr()?;
2133            stream.read_to_end(100).await.anyerr()?;
2134
2135            info!("client closing");
2136            conn.close(0u8.into(), b"");
2137            ep.close().await;
2138            Ok(())
2139        }
2140
2141        #[instrument(name = "server", skip_all)]
2142        async fn accept(
2143            relay_map: RelayMap,
2144            node_addr_tx: oneshot::Sender<EndpointAddr>,
2145        ) -> Result<ConnectionError> {
2146            let secret = SecretKey::from([1u8; 32]);
2147            let ep = Endpoint::builder()
2148                .secret_key(secret)
2149                .alpns(vec![TEST_ALPN.to_vec()])
2150                .insecure_skip_relay_cert_verify(true)
2151                .relay_mode(RelayMode::Custom(relay_map))
2152                .bind()
2153                .await?;
2154            ep.online().await;
2155            let node_addr = ep.addr();
2156            node_addr_tx.send(node_addr).unwrap();
2157
2158            info!(me = %ep.id().fmt_short(), "server starting");
2159            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
2160            info!(me = %ep.id().fmt_short(), "server accepted connection");
2161
2162            // Wait for a relay connection to be added.  Client does all the asserting here,
2163            // we just want to wait so we get to see all the mechanics of the connection
2164            // being added on this side too.
2165            let mut paths = conn.paths().stream();
2166            time::timeout(Duration::from_secs(5), async move {
2167                while let Some(infos) = paths.next().await {
2168                    info!(?infos, "new PathInfos");
2169                    if infos.iter().any(|path| path.is_relay()) {
2170                        info!("server has a relay path");
2171                        break;
2172                    }
2173                }
2174            })
2175            .await
2176            .anyerr()?;
2177
2178            let mut stream = conn.open_uni().await.anyerr()?;
2179            stream.write_all(b"have relay").await.anyerr()?;
2180            stream.finish().anyerr()?;
2181            info!("waiting conn.closed()");
2182
2183            Ok(conn.closed().await)
2184        }
2185
2186        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
2187        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));
2188
2189        client_task.await.anyerr()??;
2190        let conn_closed = dbg!(server_task.await.anyerr()??);
2191        assert!(matches!(
2192            conn_closed,
2193            ConnectionError::ApplicationClosed(ApplicationClose { .. })
2194        ));
2195
2196        Ok(())
2197    }
2198
2199    #[tokio::test]
2200    #[traced_test]
2201    async fn endpoint_relay_map_change() -> Result {
2202        let (relay_map, relay_url, _guard1) = run_relay_server().await?;
2203        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2204            .insecure_skip_relay_cert_verify(true)
2205            .bind()
2206            .await?;
2207        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2208            .insecure_skip_relay_cert_verify(true)
2209            .alpns(vec![TEST_ALPN.to_vec()])
2210            .bind()
2211            .await?;
2212
2213        let task = tokio::spawn({
2214            let server = server.clone();
2215            async move {
2216                for i in 0..2 {
2217                    println!("accept: round {i}");
2218                    let Some(conn) = server.accept().await else {
2219                        n0_error::bail_any!("Expected an incoming connection");
2220                    };
2221                    let conn = conn.await.anyerr()?;
2222                    let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2223                    let data = recv.read_to_end(1000).await.anyerr()?;
2224                    send.write_all(&data).await.anyerr()?;
2225                    send.finish().anyerr()?;
2226                    conn.closed().await;
2227                }
2228                Ok::<_, Error>(())
2229            }
2230        });
2231
2232        server.online().await;
2233
2234        let mut addr = server.addr();
2235        println!("round1: {:?}", addr);
2236
2237        // remove direct addrs to force relay usage
2238        addr.addrs
2239            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2240
2241        let conn = client.connect(addr, TEST_ALPN).await?;
2242        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2243        send.write_all(b"Hello, world!").await.anyerr()?;
2244        send.finish().anyerr()?;
2245        let data = recv.read_to_end(1000).await.anyerr()?;
2246        conn.close(0u32.into(), b"bye!");
2247
2248        assert_eq!(&data, b"Hello, world!");
2249
2250        // setup a second relay server
2251        let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
2252        let new_endpoint = new_relay_map
2253            .get(&new_relay_url)
2254            .expect("missing endpoint")
2255            .clone();
2256        dbg!(&new_relay_map);
2257
2258        let addr_watcher = server.watch_addr();
2259
2260        // add new new relay
2261        assert!(
2262            server
2263                .insert_relay(new_relay_url.clone(), new_endpoint.clone())
2264                .await
2265                .is_none()
2266        );
2267        // remove the old relay
2268        assert!(server.remove_relay(&relay_url).await.is_some());
2269
2270        println!("------- changed ----- ");
2271
2272        let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
2273            let mut stream = addr_watcher.stream();
2274            while let Some(addr) = stream.next().await {
2275                if addr.relay_urls().next() != Some(&relay_url) {
2276                    return addr;
2277                }
2278            }
2279            panic!("failed to change relay");
2280        })
2281        .await
2282        .anyerr()?;
2283
2284        println!("round2: {:?}", addr);
2285        assert_eq!(addr.relay_urls().next(), Some(&new_relay_url));
2286
2287        // remove direct addrs to force relay usage
2288        addr.addrs
2289            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));
2290
2291        let conn = client.connect(addr, TEST_ALPN).await?;
2292        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2293        send.write_all(b"Hello, world!").await.anyerr()?;
2294        send.finish().anyerr()?;
2295        let data = recv.read_to_end(1000).await.anyerr()?;
2296        conn.close(0u32.into(), b"bye!");
2297
2298        task.await.anyerr()??;
2299
2300        client.close().await;
2301        server.close().await;
2302
2303        assert_eq!(&data, b"Hello, world!");
2304
2305        Ok(())
2306    }
2307
2308    #[tokio::test]
2309    #[traced_test]
2310    async fn endpoint_bidi_send_recv() -> Result {
2311        let disco = MemoryLookup::new();
2312        let ep1 = Endpoint::empty_builder(RelayMode::Disabled)
2313            .address_lookup(disco.clone())
2314            .alpns(vec![TEST_ALPN.to_vec()])
2315            .bind()
2316            .await?;
2317
2318        let ep2 = Endpoint::empty_builder(RelayMode::Disabled)
2319            .address_lookup(disco.clone())
2320            .alpns(vec![TEST_ALPN.to_vec()])
2321            .bind()
2322            .await?;
2323
2324        disco.add_endpoint_info(ep1.addr());
2325        disco.add_endpoint_info(ep2.addr());
2326
2327        let ep1_endpointid = ep1.id();
2328        let ep2_endpointid = ep2.id();
2329        eprintln!("endpoint id 1 {ep1_endpointid}");
2330        eprintln!("endpoint id 2 {ep2_endpointid}");
2331
2332        async fn connect_hello(ep: Endpoint, dst: EndpointId) -> Result {
2333            let conn = ep.connect(dst, TEST_ALPN).await?;
2334            let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2335            info!("sending hello");
2336            send.write_all(b"hello").await.anyerr()?;
2337            send.finish().anyerr()?;
2338            info!("receiving world");
2339            let m = recv.read_to_end(100).await.anyerr()?;
2340            assert_eq!(m, b"world");
2341            conn.close(1u8.into(), b"done");
2342            Ok(())
2343        }
2344
2345        async fn accept_world(ep: Endpoint, src: EndpointId) -> Result {
2346            let incoming = ep.accept().await.anyerr()?;
2347            let mut iconn = incoming.accept().anyerr()?;
2348            let alpn = iconn.alpn().await?;
2349            let conn = iconn.await.anyerr()?;
2350            let endpoint_id = conn.remote_id();
2351            assert_eq!(endpoint_id, src);
2352            assert_eq!(alpn, TEST_ALPN);
2353            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2354            info!("receiving hello");
2355            let m = recv.read_to_end(100).await.anyerr()?;
2356            assert_eq!(m, b"hello");
2357            info!("sending hello");
2358            send.write_all(b"world").await.anyerr()?;
2359            send.finish().anyerr()?;
2360            match conn.closed().await {
2361                ConnectionError::ApplicationClosed(closed) => {
2362                    assert_eq!(closed.error_code, 1u8.into());
2363                    Ok(())
2364                }
2365                _ => panic!("wrong close error"),
2366            }
2367        }
2368
2369        let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_endpointid).instrument(
2370            info_span!(
2371                "p1_accept",
2372                ep1 = %ep1.id().fmt_short(),
2373                dst = %ep2_endpointid.fmt_short(),
2374            ),
2375        ));
2376        let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_endpointid).instrument(
2377            info_span!(
2378                "p2_accept",
2379                ep2 = %ep2.id().fmt_short(),
2380                dst = %ep1_endpointid.fmt_short(),
2381            ),
2382        ));
2383        let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_endpointid).instrument(
2384            info_span!(
2385                "p1_connect",
2386                ep1 = %ep1.id().fmt_short(),
2387                dst = %ep2_endpointid.fmt_short(),
2388            ),
2389        ));
2390        let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_endpointid).instrument(
2391            info_span!(
2392                "p2_connect",
2393                ep2 = %ep2.id().fmt_short(),
2394                dst = %ep1_endpointid.fmt_short(),
2395            ),
2396        ));
2397
2398        p1_accept.await.anyerr()??;
2399        p2_accept.await.anyerr()??;
2400        p1_connect.await.anyerr()??;
2401        p2_connect.await.anyerr()??;
2402
2403        Ok(())
2404    }
2405
2406    #[tokio::test]
2407    #[traced_test]
2408    async fn test_direct_addresses_no_qad_relay() -> Result {
2409        let (relay_map, _, _guard) = run_relay_server_with(false).await.unwrap();
2410
2411        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2412            .alpns(vec![TEST_ALPN.to_vec()])
2413            .insecure_skip_relay_cert_verify(true)
2414            .bind()
2415            .await?;
2416
2417        assert!(ep.addr().ip_addrs().count() > 0);
2418
2419        Ok(())
2420    }
2421
2422    #[cfg_attr(target_os = "windows", ignore = "flaky")]
2423    #[tokio::test]
2424    #[traced_test]
2425    async fn graceful_close() -> Result {
2426        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2427        let server = Endpoint::empty_builder(RelayMode::Disabled)
2428            .alpns(vec![TEST_ALPN.to_vec()])
2429            .bind()
2430            .await?;
2431        let server_addr = server.addr();
2432        let server_task = tokio::spawn(async move {
2433            let incoming = server.accept().await.anyerr()?;
2434            let conn = incoming.await.anyerr()?;
2435            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
2436            let msg = recv.read_to_end(1_000).await.anyerr()?;
2437            send.write_all(&msg).await.anyerr()?;
2438            send.finish().anyerr()?;
2439            let close_reason = conn.closed().await;
2440            Ok::<_, Error>(close_reason)
2441        });
2442
2443        let conn = client.connect(server_addr, TEST_ALPN).await?;
2444        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
2445        send.write_all(b"Hello, world!").await.anyerr()?;
2446        send.finish().anyerr()?;
2447        recv.read_to_end(1_000).await.anyerr()?;
2448        conn.close(42u32.into(), b"thanks, bye!");
2449        client.close().await;
2450
2451        let close_err = server_task.await.anyerr()??;
2452        let ConnectionError::ApplicationClosed(app_close) = close_err else {
2453            panic!("Unexpected close reason: {close_err:?}");
2454        };
2455
2456        assert_eq!(app_close.error_code, 42u32.into());
2457        assert_eq!(app_close.reason.as_ref(), b"thanks, bye!");
2458
2459        Ok(())
2460    }
2461
2462    #[cfg(feature = "metrics")]
2463    #[tokio::test]
2464    #[traced_test]
2465    async fn metrics_smoke() -> Result {
2466        use iroh_metrics::Registry;
2467
2468        let secret_key = SecretKey::from_bytes(&[0u8; 32]);
2469        let client = Endpoint::empty_builder(RelayMode::Disabled)
2470            .secret_key(secret_key)
2471            .bind()
2472            .await?;
2473        let secret_key = SecretKey::from_bytes(&[1u8; 32]);
2474        let server = Endpoint::empty_builder(RelayMode::Disabled)
2475            .secret_key(secret_key)
2476            .alpns(vec![TEST_ALPN.to_vec()])
2477            .bind()
2478            .await?;
2479        let server_addr = server.addr();
2480        let server_task = tokio::task::spawn(async move {
2481            let conn = server.accept().await.anyerr()?.await.anyerr()?;
2482            let mut uni = conn.accept_uni().await.anyerr()?;
2483            uni.read_to_end(10).await.anyerr()?;
2484            drop(conn);
2485            Ok::<_, Error>(server)
2486        });
2487        let conn = client.connect(server_addr, TEST_ALPN).await?;
2488        let mut uni = conn.open_uni().await.anyerr()?;
2489        uni.write_all(b"helloworld").await.anyerr()?;
2490        uni.finish().anyerr()?;
2491        conn.closed().await;
2492        drop(conn);
2493        let server = server_task.await.anyerr()??;
2494
2495        let m = client.metrics();
2496        // assert_eq!(m.socket.num_direct_conns_added.get(), 1);
2497        // assert_eq!(m.socket.connection_became_direct.get(), 1);
2498        // assert_eq!(m.socket.connection_handshake_success.get(), 1);
2499        // assert_eq!(m.socket.endpoints_contacted_directly.get(), 1);
2500        assert!(m.socket.recv_datagrams.get() > 0);
2501
2502        let m = server.metrics();
2503        // assert_eq!(m.socket.num_direct_conns_added.get(), 1);
2504        // assert_eq!(m.socket.connection_became_direct.get(), 1);
2505        // assert_eq!(m.socket.endpoints_contacted_directly.get(), 1);
2506        // assert_eq!(m.socket.connection_handshake_success.get(), 1);
2507        assert!(m.socket.recv_datagrams.get() > 0);
2508
2509        // test openmetrics encoding with labeled subregistries per endpoint
2510        fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) {
2511            let id = endpoint.id().fmt_short();
2512            let sub_registry = registry.sub_registry_with_label("id", id.to_string());
2513            sub_registry.register_all(endpoint.metrics());
2514        }
2515        let mut registry = Registry::default();
2516        register_endpoint(&mut registry, &client);
2517        register_endpoint(&mut registry, &server);
2518        // let s = registry.encode_openmetrics_to_string().anyerr()?;
2519        // assert!(s.contains(r#"socket_endpoints_contacted_directly_total{id="3b6a27bcce"} 1"#));
2520        // assert!(s.contains(r#"socket_endpoints_contacted_directly_total{id="8a88e3dd74"} 1"#));
2521        Ok(())
2522    }
2523
2524    /// Configures the accept side to take `accept_alpns` ALPNs, then connects to it with `primary_connect_alpn`
2525    /// with `secondary_connect_alpns` set, and finally returns the negotiated ALPN.
2526    async fn alpn_connection_test(
2527        accept_alpns: Vec<Vec<u8>>,
2528        primary_connect_alpn: &[u8],
2529        secondary_connect_alpns: Vec<Vec<u8>>,
2530    ) -> Result<Vec<u8>> {
2531        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
2532        let server = Endpoint::empty_builder(RelayMode::Disabled)
2533            .alpns(accept_alpns)
2534            .bind()
2535            .await?;
2536        let server_addr = server.addr();
2537        let server_task = tokio::spawn({
2538            let server = server.clone();
2539            async move {
2540                let incoming = server.accept().await.anyerr()?;
2541                let conn = incoming.await.anyerr()?;
2542                conn.close(0u32.into(), b"bye!");
2543                n0_error::Ok(conn.alpn().to_vec())
2544            }
2545        });
2546
2547        let conn = client
2548            .connect_with_opts(
2549                server_addr,
2550                primary_connect_alpn,
2551                ConnectOptions::new().with_additional_alpns(secondary_connect_alpns),
2552            )
2553            .await?;
2554        let conn = conn.await.anyerr()?;
2555        let client_alpn = conn.alpn();
2556        conn.closed().await;
2557        client.close().await;
2558        server.close().await;
2559
2560        let server_alpn = server_task.await.anyerr()??;
2561
2562        assert_eq!(client_alpn, server_alpn);
2563
2564        Ok(server_alpn.to_vec())
2565    }
2566
2567    #[tokio::test]
2568    #[traced_test]
2569    async fn connect_multiple_alpn_negotiated() -> Result {
2570        const ALPN_ONE: &[u8] = b"alpn/1";
2571        const ALPN_TWO: &[u8] = b"alpn/2";
2572
2573        assert_eq!(
2574            alpn_connection_test(
2575                // Prefer version 2 over version 1 on the accept side
2576                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2577                ALPN_TWO,
2578                vec![ALPN_ONE.to_vec()],
2579            )
2580            .await?,
2581            ALPN_TWO.to_vec(),
2582            "accept side prefers version 2 over 1"
2583        );
2584
2585        assert_eq!(
2586            alpn_connection_test(
2587                // Only support the old version
2588                vec![ALPN_ONE.to_vec()],
2589                ALPN_TWO,
2590                vec![ALPN_ONE.to_vec()],
2591            )
2592            .await?,
2593            ALPN_ONE.to_vec(),
2594            "accept side only supports the old version"
2595        );
2596
2597        assert_eq!(
2598            alpn_connection_test(
2599                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
2600                ALPN_ONE,
2601                vec![ALPN_TWO.to_vec()],
2602            )
2603            .await?,
2604            ALPN_TWO.to_vec(),
2605            "connect side ALPN order doesn't matter"
2606        );
2607
2608        assert_eq!(
2609            alpn_connection_test(vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()], ALPN_ONE, vec![],)
2610                .await?,
2611            ALPN_ONE.to_vec(),
2612            "connect side only supports the old version"
2613        );
2614
2615        Ok(())
2616    }
2617
2618    #[tokio::test]
2619    #[traced_test]
2620    async fn watch_net_report() -> Result {
2621        let endpoint = Endpoint::empty_builder(RelayMode::Staging).bind().await?;
2622
2623        // can get a first report
2624        endpoint.net_report().updated().await.anyerr()?;
2625
2626        Ok(())
2627    }
2628
2629    /// Tests that initial connection establishment isn't extremely slow compared
2630    /// to subsequent connections.
2631    ///
2632    /// This is a time based test, but uses a very large ratio to reduce flakiness.
2633    /// It also does a number of connections to average out any anomalies.
2634    #[tokio::test]
2635    #[traced_test]
2636    async fn connect_multi_time() -> Result {
2637        let n = 32;
2638
2639        const NOOP_ALPN: &[u8] = b"noop";
2640
2641        #[derive(Debug, Clone)]
2642        struct Noop;
2643
2644        impl ProtocolHandler for Noop {
2645            async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
2646                connection.closed().await;
2647                Ok(())
2648            }
2649        }
2650
2651        async fn noop_server() -> Result<(Router, EndpointAddr)> {
2652            let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2653                .bind()
2654                .await
2655                .anyerr()?;
2656            let addr = endpoint.addr();
2657            let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
2658            Ok((router, addr))
2659        }
2660
2661        let routers = stream::iter(0..n)
2662            .map(|_| noop_server())
2663            .buffered_unordered(32)
2664            .collect::<Vec<_>>()
2665            .await
2666            .into_iter()
2667            .collect::<Result<Vec<_>, _>>()
2668            .anyerr()?;
2669
2670        let addrs = routers
2671            .iter()
2672            .map(|(_, addr)| addr.clone())
2673            .collect::<Vec<_>>();
2674        let ids = addrs.iter().map(|addr| addr.id).collect::<Vec<_>>();
2675        let address_lookup = MemoryLookup::from_endpoint_info(addrs);
2676        let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
2677            .address_lookup(address_lookup)
2678            .bind()
2679            .await
2680            .anyerr()?;
2681        // wait for the endpoint to be initialized. This should not be needed,
2682        // but we don't want to measure endpoint init time but connection time
2683        // from a fully initialized endpoint.
2684        endpoint.addr();
2685        let t0 = Instant::now();
2686        for id in &ids {
2687            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2688            conn.close(0u32.into(), b"done");
2689        }
2690        let dt0 = t0.elapsed().as_secs_f64();
2691        let t1 = Instant::now();
2692        for id in &ids {
2693            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
2694            conn.close(0u32.into(), b"done");
2695        }
2696        let dt1 = t1.elapsed().as_secs_f64();
2697
2698        assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
2699        Ok(())
2700    }
2701
2702    #[tokio::test]
2703    async fn test_custom_relay() -> Result {
2704        let _ep = Endpoint::empty_builder(RelayMode::custom([RelayUrl::from_str(
2705            "https://use1-1.relay.n0.iroh-canary.iroh.link.",
2706        )?]))
2707        .bind()
2708        .await?;
2709
2710        let relays = RelayMap::try_from_iter([
2711            "https://use1-1.relay.n0.iroh.iroh.link/",
2712            "https://euc1-1.relay.n0.iroh.iroh.link/",
2713        ])?;
2714        let _ep = Endpoint::empty_builder(RelayMode::Custom(relays))
2715            .bind()
2716            .await?;
2717
2718        Ok(())
2719    }
2720
2721    /// Testing bind_addr: Clear IP transports and add single IPv4 bind
2722    #[tokio::test]
2723    #[traced_test]
2724    async fn test_bind_addr_clear() -> Result {
2725        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2726            .clear_ip_transports()
2727            .bind_addr((Ipv4Addr::LOCALHOST, 0))?
2728            .bind()
2729            .await?;
2730        let bound_sockets = ep.bound_sockets();
2731        assert_eq!(bound_sockets.len(), 1);
2732        assert_eq!(bound_sockets[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2733        ep.close().await;
2734        Ok(())
2735    }
2736
2737    /// Testing bind_addr: Do not clear IP transports and add single non-default IPv4 bind
2738    ///
2739    /// This will bind two sockets: default wildcard bind for IPv6, and our
2740    /// manually-added IPv4 bind.
2741    #[tokio::test]
2742    #[traced_test]
2743    async fn test_bind_addr_no_clear() -> Result {
2744        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2745            .bind_addr((Ipv4Addr::LOCALHOST, 0))?
2746            .bind()
2747            .await?;
2748        let bound_sockets = ep.bound_sockets();
2749        assert_eq!(bound_sockets.len(), 2);
2750        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
2751        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2752        // Test that our manually added socket is there
2753        assert!(
2754            bound_sockets
2755                .iter()
2756                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2757        );
2758        ep.close().await;
2759        Ok(())
2760    }
2761
2762    // Testing bind_addr: Do not clear IP transports and add single default IPv4 bind.
2763    //
2764    // This replaces the default IPv4 bind added by the builder,
2765    // but keeps the default wildcard IPv6 bind.
2766    #[tokio::test]
2767    #[traced_test]
2768    async fn test_bind_addr_default() -> Result {
2769        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2770            .bind_addr_with_opts(
2771                (Ipv4Addr::LOCALHOST, 0),
2772                BindOpts::default().set_is_default_route(true),
2773            )?
2774            .bind()
2775            .await?;
2776        let bound_sockets = ep.bound_sockets();
2777        assert_eq!(bound_sockets.len(), 2);
2778        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
2779        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2780        assert!(
2781            bound_sockets
2782                .iter()
2783                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2784        );
2785        ep.close().await;
2786        drop(ep);
2787
2788        Ok(())
2789    }
2790
2791    /// Testing bind_addr: Do not clear IP transports and add single IPv4 bind with a non-zero prefix len
2792    ///
2793    /// This will bind three sockets: default wildcard bind for IPv4 and IPv6, and our
2794    /// manually-added IPv4 bind.
2795    #[tokio::test]
2796    #[traced_test]
2797    async fn test_bind_addr_nonzero_prefix() -> Result {
2798        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2799            .bind_addr_with_opts(
2800                (Ipv4Addr::LOCALHOST, 0),
2801                BindOpts::default().set_prefix_len(32),
2802            )?
2803            .bind()
2804            .await?;
2805        let bound_sockets = ep.bound_sockets();
2806        assert_eq!(bound_sockets.len(), 3);
2807        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 2);
2808        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
2809        // Test that the default wildcard socket is there
2810        assert!(
2811            bound_sockets
2812                .iter()
2813                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
2814        );
2815        // Test that our manually added socket is there
2816        assert!(
2817            bound_sockets
2818                .iter()
2819                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2820        );
2821        ep.close().await;
2822        Ok(())
2823    }
2824
2825    /// Bind on an unusable port with the default opts.
2826    ///
2827    /// Binding the endpoint fails with an AddrInUse error.
2828    #[tokio::test]
2829    #[traced_test]
2830    async fn test_bind_addr_badport() -> Result {
2831        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2832        let port = socket.local_addr()?.port();
2833
2834        let res = Endpoint::empty_builder(RelayMode::Disabled)
2835            .clear_ip_transports()
2836            .bind_addr((Ipv4Addr::LOCALHOST, port))?
2837            .bind()
2838            .await;
2839
2840        assert!(matches!(
2841            res,
2842            Err(BindError::Sockets {
2843                source: io_error,
2844                ..
2845            })
2846            if io_error.kind() == io::ErrorKind::AddrInUse
2847        ));
2848        Ok(())
2849    }
2850
2851    /// Bind a non-default route on an unusable port, but set is_required = false.
2852    ///
2853    /// Binding the endpoint succeeds.
2854    #[tokio::test]
2855    #[traced_test]
2856    async fn test_bind_addr_badport_notrequired() -> Result {
2857        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2858        let port = socket.local_addr()?.port();
2859
2860        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2861            .bind_addr_with_opts(
2862                (Ipv4Addr::LOCALHOST, port),
2863                BindOpts::default()
2864                    .set_prefix_len(32)
2865                    .set_is_required(false),
2866            )?
2867            .bind()
2868            .await?;
2869        let bound_sockets = ep.bound_sockets();
2870        // just the default wildcard binds
2871        assert_eq!(bound_sockets.len(), 2);
2872        // our requested bind addr is not included because it failed to bind
2873        assert!(
2874            !bound_sockets
2875                .iter()
2876                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2877        );
2878        Ok(())
2879    }
2880
2881    /// Bind on a default route on an unusable port, but set is_required = false.
2882    ///
2883    /// Binding the endpoint succeeds.
2884    #[tokio::test]
2885    #[traced_test]
2886    async fn test_bind_addr_badport_default_notrequired() -> Result {
2887        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2888        let port = socket.local_addr()?.port();
2889
2890        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2891            .bind_addr_with_opts(
2892                (Ipv4Addr::LOCALHOST, port),
2893                BindOpts::default().set_is_required(false),
2894            )?
2895            .bind()
2896            .await?;
2897        let bound_sockets = ep.bound_sockets();
2898        // just the IPv6 default, but no IPv4 bind at all because we replaced the default
2899        // with a bind with an unusable port and set it to not be required.
2900        assert_eq!(bound_sockets.len(), 1);
2901        assert!(bound_sockets[0].is_ipv6());
2902        Ok(())
2903    }
2904
2905    /// Bind on an unusable port, with is_required = false, and no other transports.
2906    ///
2907    /// Binding the endpoint fails with "no valid address available".
2908    #[tokio::test]
2909    #[traced_test]
2910    async fn test_bind_addr_badport_notrequired_no_other_transports() -> Result {
2911        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
2912        let port = socket.local_addr()?.port();
2913
2914        let res = Endpoint::empty_builder(RelayMode::Disabled)
2915            .clear_ip_transports()
2916            .bind_addr_with_opts(
2917                (Ipv4Addr::LOCALHOST, port),
2918                BindOpts::default().set_is_required(false),
2919            )?
2920            .bind()
2921            .await;
2922
2923        assert!(matches!(
2924            res,
2925            Err(BindError::CreateQuicEndpoint {
2926                source: io_error,
2927                ..
2928            })
2929            if io_error.kind() == io::ErrorKind::Other && io_error.to_string() == "no valid address available"
2930        ));
2931        Ok(())
2932    }
2933
2934    /// Bind with prefix len 0 but set the route as non-default.
2935    #[tokio::test]
2936    #[traced_test]
2937    async fn test_bind_addr_prefix_len_0_not_default() -> Result {
2938        let ep = Endpoint::empty_builder(RelayMode::Disabled)
2939            .bind_addr_with_opts(
2940                (Ipv4Addr::LOCALHOST, 0),
2941                BindOpts::default().set_is_default_route(false),
2942            )?
2943            .bind()
2944            .await?;
2945        let bound_sockets = ep.bound_sockets();
2946        // The two default wildcard binds plus our additional route (which does not replace the default route
2947        // because we set is_default_route to false explicitly).
2948        assert_eq!(bound_sockets.len(), 3);
2949        assert!(
2950            bound_sockets
2951                .iter()
2952                .any(|x| x.ip() == IpAddr::V6(Ipv6Addr::UNSPECIFIED))
2953        );
2954        assert!(
2955            bound_sockets
2956                .iter()
2957                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
2958        );
2959        assert!(
2960            bound_sockets
2961                .iter()
2962                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
2963        );
2964        Ok(())
2965    }
2966
2967    #[ignore = "flaky"]
2968    #[tokio::test]
2969    #[traced_test]
2970    async fn connect_via_relay_becomes_direct_and_sends_direct() -> Result {
2971        let (relay_map, relay_url, _relay_server_guard) = run_relay_server().await?;
2972        let qlog = Arc::new(QlogFileGroup::from_env(
2973            "connect_via_relay_becomes_direct_and_sends_direct",
2974        ));
2975        let transfer_size = 1_000_000;
2976
2977        async fn collect_path_infos(conn: Connection) -> BTreeMap<TransportAddr, PathInfo> {
2978            let mut path_infos = BTreeMap::new();
2979            let mut paths = conn.paths().stream();
2980            while let Some(path_list) = paths.next().await {
2981                for path in path_list {
2982                    path_infos.insert(path.remote_addr().clone(), path);
2983                }
2984            }
2985            path_infos
2986        }
2987
2988        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2989            .insecure_skip_relay_cert_verify(true)
2990            .transport_config(qlog.create("client")?)
2991            .bind()
2992            .await?;
2993        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
2994            .insecure_skip_relay_cert_verify(true)
2995            .transport_config(qlog.create("server")?)
2996            .alpns(vec![TEST_ALPN.to_vec()])
2997            .bind()
2998            .await?;
2999        let server_addr = EndpointAddr::new(server.id()).with_relay_url(relay_url);
3000        let server_task = tokio::spawn(async move {
3001            let incoming = server.accept().await.anyerr()?;
3002            let conn = incoming.await.anyerr()?;
3003            let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3004            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
3005            let msg = recv.read_to_end(transfer_size).await.anyerr()?;
3006            send.write_all(&msg).await.anyerr()?;
3007            send.finish().anyerr()?;
3008            conn.closed().await;
3009            let stats = stats_task.await.std_context("server stats task failed")?;
3010            Ok::<_, Error>(stats)
3011        });
3012
3013        let conn = client.connect(server_addr, TEST_ALPN).await?;
3014        let stats_task = AbortOnDropHandle::new(tokio::spawn(collect_path_infos(conn.clone())));
3015        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
3016        send.write_all(&vec![42u8; transfer_size]).await.anyerr()?;
3017        send.finish().anyerr()?;
3018        recv.read_to_end(transfer_size).await.anyerr()?;
3019        conn.close(0u32.into(), b"thanks, bye!");
3020        client.close().await;
3021        let client_stats = stats_task.await.std_context("client stats task failed")?;
3022        let server_stats = server_task.await.anyerr()??;
3023
3024        info!("client stats: {client_stats:#?}");
3025        info!("server stats: {server_stats:#?}");
3026
3027        let client_total_relay_tx = client_stats
3028            .values()
3029            .filter(|p| p.is_relay())
3030            .map(|p| p.stats().udp_tx.bytes)
3031            .sum::<u64>();
3032        let client_total_relay_rx = client_stats
3033            .values()
3034            .filter(|p| p.is_relay())
3035            .map(|p| p.stats().udp_rx.bytes)
3036            .sum::<u64>();
3037        let server_total_relay_tx = server_stats
3038            .values()
3039            .filter(|p| p.is_relay())
3040            .map(|p| p.stats().udp_tx.bytes)
3041            .sum::<u64>();
3042        let server_total_relay_rx = server_stats
3043            .values()
3044            .filter(|p| p.is_relay())
3045            .map(|p| p.stats().udp_rx.bytes)
3046            .sum::<u64>();
3047
3048        info!(?client_total_relay_tx, "total");
3049        info!(?client_total_relay_rx, "total");
3050        info!(?server_total_relay_tx, "total");
3051        info!(?server_total_relay_rx, "total");
3052
3053        // We should send/receive only the minorty of traffic via the relay.
3054        assert!(client_total_relay_tx < transfer_size as u64 / 2);
3055        assert!(client_total_relay_rx < transfer_size as u64 / 2);
3056        assert!(server_total_relay_tx < transfer_size as u64 / 2);
3057        assert!(server_total_relay_rx < transfer_size as u64 / 2);
3058
3059        Ok(())
3060    }
3061}