Skip to main content

iroh_relay/
server.rs

1//! A fully-fledged iroh-relay server over HTTP or HTTPS.
2//!
3//! This module provides an API to run a full fledged iroh-relay server.  It is primarily
4//! used by the `iroh-relay` binary in this crate.  It can be used to run a relay server in
5//! other locations however.
6//!
7//! This code is fully written in a form of structured-concurrency: every spawned task is
8//! always attached to a handle and when the handle is dropped the tasks abort.  So tasks
9//! can not outlive their handle.  It is also always possible to await for completion of a
10//! task.  Some tasks additionally have a method to do graceful shutdown.
11//!
12//! The relay server hosts the following services:
13//!
14//! - HTTPS `/relay`: The main URL endpoint to which clients connect and sends traffic over.
15//! - HTTPS `/ping`: Used for net_report probes.
16//! - HTTPS `/generate_204`: Used for net_report probes.
17
18use std::{fmt, future::Future, net::SocketAddr, num::NonZeroU32, pin::Pin, sync::Arc};
19
20use derive_more::Debug;
21use http::{
22    HeaderMap, HeaderValue, Method, Request, Response, StatusCode, header::InvalidHeaderValue,
23    response::Builder as ResponseBuilder,
24};
25use hyper::body::Incoming;
26use iroh_base::EndpointId;
27#[cfg(feature = "test-utils")]
28use iroh_base::RelayUrl;
29use n0_error::{e, stack_error};
30use n0_future::{StreamExt, future::Boxed};
31use serde::Serialize;
32use tokio::{
33    net::TcpListener,
34    task::{JoinError, JoinSet},
35};
36use tokio_util::task::AbortOnDropHandle;
37use tracing::{Instrument, debug, error, info, info_span, instrument};
38
39use crate::{
40    defaults::DEFAULT_KEY_CACHE_CAPACITY,
41    http::RELAY_PROBE_PATH,
42    quic::server::{QuicServer, QuicSpawnError, ServerHandle as QuicServerHandle},
43};
44
45pub mod client;
46pub mod clients;
47pub mod http_server;
48mod metrics;
49pub(crate) mod resolver;
50pub mod streams;
51#[cfg(feature = "test-utils")]
52pub mod testing;
53
54pub use self::{
55    http_server::{Handlers, RelayService},
56    metrics::{Metrics, RelayMetrics},
57    resolver::{DEFAULT_CERT_RELOAD_INTERVAL, ReloadingResolver},
58};
59
60const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Iroh-Challenge";
61const NO_CONTENT_RESPONSE_HEADER: &str = "X-Iroh-Response";
62const NOTFOUND: &[u8] = b"Not Found";
63const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n";
64const INDEX: &[u8] = br#"<html><body>
65<h1>Iroh Relay</h1>
66<p>
67  This is an <a href="https://iroh.computer/">Iroh</a> Relay server.
68</p>
69"#;
70const TLS_HEADERS: [(&str, &str); 2] = [
71    (
72        "Strict-Transport-Security",
73        "max-age=63072000; includeSubDomains",
74    ),
75    (
76        "Content-Security-Policy",
77        "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'",
78    ),
79];
80
81type BytesBody = http_body_util::Full<hyper::body::Bytes>;
82type HyperError = Box<dyn std::error::Error + Send + Sync>;
83type HyperResult<T> = std::result::Result<T, HyperError>;
84
85/// Creates a new [`BytesBody`] with no content.
86fn body_empty() -> BytesBody {
87    http_body_util::Full::new(hyper::body::Bytes::new())
88}
89
90/// Configuration for the full Relay.
91///
92/// Be aware the generic parameters are for when using the Let's Encrypt TLS configuration.
93/// If not used dummy ones need to be provided, e.g. `ServerConfig::<(), ()>::default()`.
94#[derive(Debug, Default)]
95pub struct ServerConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
96    /// Configuration for the Relay server, disabled if `None`.
97    pub relay: Option<RelayConfig<EC, EA>>,
98    /// Configuration for the QUIC server, disabled if `None`.
99    pub quic: Option<QuicConfig>,
100    /// Socket to serve metrics on.
101    #[cfg(feature = "metrics")]
102    pub metrics_addr: Option<SocketAddr>,
103}
104
105/// Configuration for the Relay HTTP and HTTPS server.
106///
107/// This includes the HTTP services hosted by the Relay server, the Relay `/relay` HTTP
108/// endpoint is only one of the services served.
109#[derive(Debug)]
110pub struct RelayConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
111    /// The socket address on which the Relay HTTP server should bind.
112    ///
113    /// Normally you'd choose port `80`.  The bind address for the HTTPS server is
114    /// configured in [`RelayConfig::tls`].
115    ///
116    /// If [`RelayConfig::tls`] is `None` then this serves all the HTTP services without
117    /// TLS.
118    pub http_bind_addr: SocketAddr,
119    /// TLS configuration for the HTTPS server.
120    ///
121    /// If *None* all the HTTP services that would be served here are served from
122    /// [`RelayConfig::http_bind_addr`].
123    pub tls: Option<TlsConfig<EC, EA>>,
124    /// Rate limits.
125    pub limits: Limits,
126    /// Key cache capacity.
127    pub key_cache_capacity: Option<usize>,
128    /// Access configuration.
129    pub access: AccessConfig,
130}
131
132/// Controls which endpoints are allowed to use the relay.
133#[derive(derive_more::Debug)]
134pub enum AccessConfig {
135    /// Everyone
136    Everyone,
137    /// Only endpoints for which the function returns `Access::Allow`.
138    #[debug("restricted")]
139    Restricted(Box<dyn Fn(EndpointId) -> Boxed<Access> + Send + Sync + 'static>),
140}
141
142impl AccessConfig {
143    /// Is this endpoint allowed?
144    pub async fn is_allowed(&self, endpoint: EndpointId) -> bool {
145        match self {
146            Self::Everyone => true,
147            Self::Restricted(check) => {
148                let res = check(endpoint).await;
149                matches!(res, Access::Allow)
150            }
151        }
152    }
153}
154
155/// Access restriction for an endpoint.
156#[derive(Debug, Copy, Clone, PartialEq, Eq)]
157pub enum Access {
158    /// Access is allowed.
159    Allow,
160    /// Access is denied.
161    Deny,
162}
163
164/// Configuration for the QUIC server.
165#[derive(Debug)]
166pub struct QuicConfig {
167    /// The socket address on which the QUIC server should bind.
168    ///
169    /// Normally you'd chose port `7842`, see [`crate::defaults::DEFAULT_RELAY_QUIC_PORT`].
170    pub bind_addr: SocketAddr,
171    /// The TLS server configuration for the QUIC server.
172    ///
173    /// If this [`rustls::ServerConfig`] does not support TLS 1.3, the QUIC server will fail
174    /// to spawn.
175    pub server_config: rustls::ServerConfig,
176}
177
178/// TLS configuration for Relay server.
179///
180/// Normally the Relay server accepts connections on both HTTPS and HTTP.
181#[derive(Debug)]
182pub struct TlsConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
183    /// The socket address on which to serve the HTTPS server.
184    ///
185    /// Since the captive portal probe has to run over plain text HTTP and TLS is used for
186    /// the main relay server this has to be on a different port.  When TLS is not enabled
187    /// this is served on the [`RelayConfig::http_bind_addr`] socket address.
188    ///
189    /// Normally you'd choose port `80`.
190    pub https_bind_addr: SocketAddr,
191    /// The socket address on which to server the QUIC server is QUIC is enabled.
192    pub quic_bind_addr: SocketAddr,
193    /// Mode for getting a cert.
194    pub cert: CertConfig<EC, EA>,
195    /// The server configuration.
196    pub server_config: rustls::ServerConfig,
197}
198
199/// Rate limits.
200// TODO: accept_conn_limit and accept_conn_burst are not currently implemented.
201#[derive(Debug, Default)]
202pub struct Limits {
203    /// Rate limit for accepting new connection. Unlimited if not set.
204    pub accept_conn_limit: Option<f64>,
205    /// Burst limit for accepting new connection. Unlimited if not set.
206    pub accept_conn_burst: Option<usize>,
207    /// Rate limits for incoming traffic from a client connection.
208    pub client_rx: Option<ClientRateLimit>,
209}
210
211/// Per-client rate limit configuration.
212#[derive(Debug, Copy, Clone)]
213pub struct ClientRateLimit {
214    /// Max number of bytes per second to read from the client connection.
215    pub bytes_per_second: NonZeroU32,
216    /// Max number of bytes to read in a single burst.
217    pub max_burst_bytes: Option<NonZeroU32>,
218}
219
220/// TLS certificate configuration.
221#[derive(derive_more::Debug)]
222pub enum CertConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
223    /// Use Let's Encrypt.
224    LetsEncrypt {
225        /// State for Let's Encrypt certificates.
226        #[debug("AcmeConfig")]
227        state: tokio_rustls_acme::AcmeState<EC, EA>,
228    },
229    /// Use a static TLS key and certificate chain.
230    Manual {
231        /// The TLS certificate chain.
232        certs: Vec<rustls::pki_types::CertificateDer<'static>>,
233    },
234    /// Use a TLS key and certificate chain that can be reloaded.
235    Reloading,
236}
237
238/// A running Relay + QAD server.
239///
240/// This is a full Relay server, including QAD, Relay and various associated HTTP services.
241///
242/// Dropping this will stop the server.
243#[derive(Debug)]
244pub struct Server {
245    /// The address of the HTTP server, if configured.
246    http_addr: Option<SocketAddr>,
247    /// The address of the HTTPS server, if the relay server is using TLS.
248    ///
249    /// If the Relay server is not using TLS then it is served from the
250    /// [`Server::http_addr`].
251    https_addr: Option<SocketAddr>,
252    /// The address of the QUIC server, if configured.
253    quic_addr: Option<SocketAddr>,
254    /// Handle to the relay server.
255    relay_handle: Option<http_server::ServerHandle>,
256    /// Handle to the quic server.
257    quic_handle: Option<QuicServerHandle>,
258    /// The main task running the server.
259    supervisor: AbortOnDropHandle<Result<(), SupervisorError>>,
260    /// The certificate for the server.
261    ///
262    /// If the server has manual certificates configured the certificate chain will be
263    /// available here, this can be used by a client to authenticate the server.
264    certificates: Option<Vec<rustls::pki_types::CertificateDer<'static>>>,
265    metrics: RelayMetrics,
266}
267
268/// Server spawn errors
269#[allow(missing_docs)]
270#[stack_error(derive, add_meta, std_sources)]
271#[non_exhaustive]
272pub enum SpawnError {
273    #[error("Unable to get local address")]
274    LocalAddr { source: std::io::Error },
275    #[error("Failed to bind QAD listener")]
276    QuicSpawn { source: QuicSpawnError },
277    #[error("Failed to parse TLS header")]
278    TlsHeaderParse { source: InvalidHeaderValue },
279    #[error("Failed to bind TcpListener")]
280    BindTlsListener { source: std::io::Error },
281    #[error("No local address")]
282    NoLocalAddr { source: std::io::Error },
283    #[error("Failed to bind server socket to {addr}")]
284    BindTcpListener {
285        source: std::io::Error,
286        addr: SocketAddr,
287    },
288}
289
290/// Server task errors
291#[allow(missing_docs)]
292#[stack_error(derive, add_meta)]
293#[non_exhaustive]
294pub enum SupervisorError {
295    #[error("Error starting metrics server")]
296    Metrics {
297        #[error(std_err)]
298        source: std::io::Error,
299    },
300    #[error("Acme event stream finished")]
301    AcmeEventStreamFinished {},
302    #[error(transparent)]
303    JoinError {
304        #[error(from, std_err)]
305        source: JoinError,
306    },
307    #[error("No relay services are enabled")]
308    NoRelayServicesEnabled {},
309    #[error("Task cancelled")]
310    TaskCancelled {},
311}
312
313impl Server {
314    /// Starts the server.
315    pub async fn spawn<EC, EA>(config: ServerConfig<EC, EA>) -> Result<Self, SpawnError>
316    where
317        EC: fmt::Debug + 'static,
318        EA: fmt::Debug + 'static,
319    {
320        let mut tasks = JoinSet::new();
321
322        let metrics = RelayMetrics::default();
323
324        #[cfg(feature = "metrics")]
325        if let Some(addr) = config.metrics_addr {
326            debug!("Starting metrics server");
327            let mut registry = iroh_metrics::Registry::default();
328            registry.register_all(&metrics);
329            tasks.spawn(
330                async move {
331                    iroh_metrics::service::start_metrics_server(addr, Arc::new(registry))
332                        .await
333                        .map_err(|err| e!(SupervisorError::Metrics, err))
334                }
335                .instrument(info_span!("metrics-server")),
336            );
337        }
338
339        // Start the Relay server, but first clone the certs out.
340        let certificates = config.relay.as_ref().and_then(|relay| {
341            relay.tls.as_ref().and_then(|tls| match tls.cert {
342                CertConfig::LetsEncrypt { .. } => None,
343                CertConfig::Manual { ref certs, .. } => Some(certs.clone()),
344                CertConfig::Reloading => None,
345            })
346        });
347
348        let quic_server = match config.quic {
349            Some(quic_config) => {
350                debug!("Starting QUIC server {}", quic_config.bind_addr);
351                Some(
352                    QuicServer::spawn(quic_config, metrics.server.clone())
353                        .map_err(|err| e!(SpawnError::QuicSpawn, err))?,
354                )
355            }
356            None => None,
357        };
358        let quic_addr = quic_server.as_ref().map(|srv| srv.bind_addr());
359        let quic_handle = quic_server.as_ref().map(|srv| srv.handle());
360
361        let (relay_server, http_addr) = match config.relay {
362            Some(relay_config) => {
363                debug!("Starting Relay server");
364                let mut headers = HeaderMap::new();
365                for (name, value) in TLS_HEADERS.iter() {
366                    headers.insert(
367                        *name,
368                        value
369                            .parse()
370                            .map_err(|err| e!(SpawnError::TlsHeaderParse, err))?,
371                    );
372                }
373                let relay_bind_addr = match relay_config.tls {
374                    Some(ref tls) => tls.https_bind_addr,
375                    None => relay_config.http_bind_addr,
376                };
377                let key_cache_capacity = relay_config
378                    .key_cache_capacity
379                    .unwrap_or(DEFAULT_KEY_CACHE_CAPACITY);
380                let mut builder = http_server::ServerBuilder::new(relay_bind_addr)
381                    .metrics(metrics.server.clone())
382                    .headers(headers)
383                    .key_cache_capacity(key_cache_capacity)
384                    .access(relay_config.access)
385                    .request_handler(Method::GET, "/", Box::new(root_handler))
386                    .request_handler(Method::GET, "/index.html", Box::new(root_handler))
387                    .request_handler(Method::GET, RELAY_PROBE_PATH, Box::new(probe_handler))
388                    .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler))
389                    .request_handler(Method::GET, "/healthz", Box::new(healthz_handler));
390                if let Some(cfg) = relay_config.limits.client_rx {
391                    builder = builder.client_rx_ratelimit(cfg);
392                }
393                let http_addr = match relay_config.tls {
394                    Some(tls_config) => {
395                        let server_tls_config = match tls_config.cert {
396                            CertConfig::LetsEncrypt { mut state } => {
397                                let acceptor =
398                                    http_server::TlsAcceptor::LetsEncrypt(state.acceptor());
399                                tasks.spawn(
400                                    async move {
401                                        while let Some(event) = state.next().await {
402                                            match event {
403                                                Ok(ok) => debug!("acme event: {ok:?}"),
404                                                Err(err) => error!("error: {err:?}"),
405                                            }
406                                        }
407                                        Err(e!(SupervisorError::AcmeEventStreamFinished))
408                                    }
409                                    .instrument(info_span!("acme")),
410                                );
411                                Some(http_server::TlsConfig {
412                                    config: Arc::new(tls_config.server_config),
413                                    acceptor,
414                                })
415                            }
416                            CertConfig::Manual { .. } | CertConfig::Reloading => {
417                                let server_config = Arc::new(tls_config.server_config);
418                                let acceptor =
419                                    tokio_rustls::TlsAcceptor::from(server_config.clone());
420                                let acceptor = http_server::TlsAcceptor::Manual(acceptor);
421                                Some(http_server::TlsConfig {
422                                    config: server_config,
423                                    acceptor,
424                                })
425                            }
426                        };
427                        builder = builder.tls_config(server_tls_config);
428
429                        // Some services always need to be served over HTTP without TLS.  Run
430                        // these standalone.
431                        let http_listener = TcpListener::bind(&relay_config.http_bind_addr)
432                            .await
433                            .map_err(|err| e!(SpawnError::BindTlsListener, err))?;
434                        let http_addr = http_listener
435                            .local_addr()
436                            .map_err(|err| e!(SpawnError::NoLocalAddr, err))?;
437                        tasks.spawn(
438                            async move {
439                                run_captive_portal_service(http_listener).await;
440                                Ok(())
441                            }
442                            .instrument(info_span!("http-service", addr = %http_addr)),
443                        );
444                        Some(http_addr)
445                    }
446                    None => {
447                        // If running Relay without TLS add the plain HTTP server directly
448                        // to the Relay server.
449                        builder = builder.request_handler(
450                            Method::GET,
451                            "/generate_204",
452                            Box::new(serve_no_content_handler),
453                        );
454                        None
455                    }
456                };
457                let relay_server = builder.spawn().await?;
458                (Some(relay_server), http_addr)
459            }
460            None => (None, None),
461        };
462        // If http_addr is Some then relay_server is serving HTTPS.  If http_addr is None
463        // relay_server is serving HTTP, including the /generate_204 service.
464        let relay_addr = relay_server.as_ref().map(|srv| srv.addr());
465        let relay_handle = relay_server.as_ref().map(|srv| srv.handle());
466        let task = tokio::spawn(relay_supervisor(tasks, relay_server, quic_server));
467
468        Ok(Self {
469            http_addr: http_addr.or(relay_addr),
470            https_addr: http_addr.and(relay_addr),
471            quic_addr,
472            relay_handle,
473            quic_handle,
474            supervisor: AbortOnDropHandle::new(task),
475            certificates,
476            metrics,
477        })
478    }
479
480    /// Requests graceful shutdown.
481    ///
482    /// Returns once all server tasks have stopped.
483    pub async fn shutdown(self) -> Result<(), SupervisorError> {
484        // Only the Relay server and QUIC server need shutting down, the supervisor will abort the tasks in
485        // the JoinSet when the server terminates.
486        if let Some(handle) = self.relay_handle {
487            handle.shutdown();
488        }
489        if let Some(handle) = self.quic_handle {
490            handle.shutdown();
491        }
492        self.supervisor.await?
493    }
494
495    /// Returns the handle for the task.
496    ///
497    /// This allows waiting for the server's supervisor task to finish.  Can be useful in
498    /// case there is an error in the server before it is shut down.
499    pub fn task_handle(&mut self) -> &mut AbortOnDropHandle<Result<(), SupervisorError>> {
500        &mut self.supervisor
501    }
502
503    /// The socket address the HTTPS server is listening on.
504    pub fn https_addr(&self) -> Option<SocketAddr> {
505        self.https_addr
506    }
507
508    /// The socket address the HTTP server is listening on.
509    pub fn http_addr(&self) -> Option<SocketAddr> {
510        self.http_addr
511    }
512
513    /// The socket address the QUIC server is listening on.
514    pub fn quic_addr(&self) -> Option<SocketAddr> {
515        self.quic_addr
516    }
517
518    /// The certificates chain if configured with manual TLS certificates.
519    pub fn certificates(&self) -> Option<Vec<rustls::pki_types::CertificateDer<'static>>> {
520        self.certificates.clone()
521    }
522
523    /// Get the server's https [`RelayUrl`].
524    ///
525    /// This uses [`Self::https_addr`] so it's mostly useful for local development.
526    #[cfg(feature = "test-utils")]
527    pub fn https_url(&self) -> Option<RelayUrl> {
528        self.https_addr.map(|addr| {
529            url::Url::parse(&format!("https://{addr}"))
530                .expect("valid url")
531                .into()
532        })
533    }
534
535    /// Get the server's http [`RelayUrl`].
536    ///
537    /// This uses [`Self::http_addr`] so it's mostly useful for local development.
538    #[cfg(feature = "test-utils")]
539    pub fn http_url(&self) -> Option<RelayUrl> {
540        self.http_addr.map(|addr| {
541            url::Url::parse(&format!("http://{addr}"))
542                .expect("valid url")
543                .into()
544        })
545    }
546
547    /// Returns the metrics collected in the relay server.
548    pub fn metrics(&self) -> &RelayMetrics {
549        &self.metrics
550    }
551}
552
553/// Supervisor for the relay server tasks.
554///
555/// As soon as one of the tasks exits, all other tasks are stopped and the server stops.
556/// The supervisor finishes once all tasks are finished.
557#[instrument(skip_all)]
558async fn relay_supervisor(
559    mut tasks: JoinSet<Result<(), SupervisorError>>,
560    mut relay_http_server: Option<http_server::Server>,
561    mut quic_server: Option<QuicServer>,
562) -> Result<(), SupervisorError> {
563    let quic_enabled = quic_server.is_some();
564    let mut quic_fut = match quic_server {
565        Some(ref mut server) => n0_future::Either::Left(server.task_handle()),
566        None => n0_future::Either::Right(n0_future::future::pending()),
567    };
568    let relay_enabled = relay_http_server.is_some();
569    let mut relay_fut = match relay_http_server {
570        Some(ref mut server) => n0_future::Either::Left(server.task_handle()),
571        None => n0_future::Either::Right(n0_future::future::pending()),
572    };
573    let res = tokio::select! {
574        biased;
575        Some(ret) = tasks.join_next() => ret,
576        ret = &mut quic_fut, if quic_enabled => ret.map(Ok),
577        ret = &mut relay_fut, if relay_enabled => ret.map(Ok),
578        else => Ok(Err(e!(SupervisorError::NoRelayServicesEnabled))),
579    };
580    let ret = match res {
581        Ok(Ok(())) => {
582            debug!("Task exited");
583            Ok(())
584        }
585        Ok(Err(err)) => {
586            error!(%err, "Task failed");
587            Err(err)
588        }
589        Err(err) => {
590            if let Ok(panic) = err.try_into_panic() {
591                error!("Task panicked");
592                std::panic::resume_unwind(panic);
593            }
594            debug!("Task cancelled");
595            Err(e!(SupervisorError::TaskCancelled))
596        }
597    };
598
599    // Ensure the HTTP server terminated, there is no harm in calling this after it is
600    // already shut down.
601    if let Some(server) = relay_http_server {
602        server.shutdown();
603    }
604
605    // Ensure the QUIC server is closed
606    if let Some(server) = quic_server {
607        server.shutdown().await;
608    }
609
610    // Stop all remaining tasks
611    tasks.shutdown().await;
612
613    ret
614}
615
616fn root_handler(
617    _r: Request<Incoming>,
618    response: ResponseBuilder,
619) -> HyperResult<Response<BytesBody>> {
620    response
621        .status(StatusCode::OK)
622        .header("Content-Type", "text/html; charset=utf-8")
623        .body(INDEX.into())
624        .map_err(|err| Box::new(err) as HyperError)
625}
626
627/// HTTP latency queries
628fn probe_handler(
629    _r: Request<Incoming>,
630    response: ResponseBuilder,
631) -> HyperResult<Response<BytesBody>> {
632    response
633        .status(StatusCode::OK)
634        .header("Access-Control-Allow-Origin", "*")
635        .body(body_empty())
636        .map_err(|err| Box::new(err) as HyperError)
637}
638
639fn robots_handler(
640    _r: Request<Incoming>,
641    response: ResponseBuilder,
642) -> HyperResult<Response<BytesBody>> {
643    response
644        .status(StatusCode::OK)
645        .body(ROBOTS_TXT.into())
646        .map_err(|err| Box::new(err) as HyperError)
647}
648
649/// For captive portal detection.
650fn serve_no_content_handler<B: hyper::body::Body>(
651    r: Request<B>,
652    mut response: ResponseBuilder,
653) -> HyperResult<Response<BytesBody>> {
654    let check = |c: &HeaderValue| {
655        !c.is_empty() && c.len() < 64 && c.as_bytes().iter().all(|c| is_challenge_char(*c as char))
656    };
657
658    if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER)
659        && check(challenge)
660    {
661        response = response.header(
662            NO_CONTENT_RESPONSE_HEADER,
663            format!("response {}", challenge.to_str()?),
664        );
665    }
666
667    response
668        .status(StatusCode::NO_CONTENT)
669        .body(body_empty())
670        .map_err(|err| Box::new(err) as HyperError)
671}
672
673fn is_challenge_char(c: char) -> bool {
674    // Semi-randomly chosen as a limited set of valid characters
675    c.is_ascii_lowercase()
676        || c.is_ascii_uppercase()
677        || c.is_ascii_digit()
678        || c == '.'
679        || c == '-'
680        || c == '_'
681}
682
683/// Health check response
684#[derive(Serialize)]
685struct Health {
686    status: &'static str,
687    version: &'static str,
688    git_hash: &'static str,
689}
690
691fn healthz_handler(
692    _r: Request<Incoming>,
693    response: ResponseBuilder,
694) -> HyperResult<Response<BytesBody>> {
695    let health = Health {
696        status: "ok",
697        version: env!("CARGO_PKG_VERSION"),
698        git_hash: option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"),
699    };
700    let body = serde_json::to_string(&health).unwrap_or_else(|_| r#"{"status":"error"}"#.into());
701    response
702        .status(StatusCode::OK)
703        .header("Content-Type", "application/json")
704        .body(body.into())
705        .map_err(|err| Box::new(err) as HyperError)
706}
707
708/// This is a future that never returns, drop it to cancel/abort.
709async fn run_captive_portal_service(http_listener: TcpListener) {
710    info!("serving");
711
712    // If this future is cancelled, this is dropped and all tasks are aborted.
713    let mut tasks = JoinSet::new();
714
715    loop {
716        tokio::select! {
717            biased;
718
719            Some(res) = tasks.join_next() => {
720                if let Err(err) = res
721                    && err.is_panic()
722                {
723                    panic!("task panicked: {err:#?}");
724                }
725            }
726
727            res = http_listener.accept() => {
728                match res {
729                    Ok((stream, peer_addr)) => {
730                        debug!(%peer_addr, "Connection opened",);
731                        let handler = CaptivePortalService;
732
733                        tasks.spawn(async move {
734                            let stream = crate::server::streams::MaybeTlsStream::Plain(stream);
735                            let stream = hyper_util::rt::TokioIo::new(stream);
736                            if let Err(err) = hyper::server::conn::http1::Builder::new()
737                                .serve_connection(stream, handler)
738                                .with_upgrades()
739                                .await
740                            {
741                                error!("Failed to serve connection: {err:?}");
742                            }
743                        });
744                    }
745                    Err(err) => {
746                        error!(
747                            "[CaptivePortalService] failed to accept connection: {:#?}",
748                            err
749                        );
750                    }
751                }
752            }
753        }
754    }
755}
756
757#[derive(Clone)]
758struct CaptivePortalService;
759
760impl hyper::service::Service<Request<Incoming>> for CaptivePortalService {
761    type Response = Response<BytesBody>;
762    type Error = HyperError;
763    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
764
765    fn call(&self, req: Request<Incoming>) -> Self::Future {
766        match (req.method(), req.uri().path()) {
767            // Captive Portal checker
768            (&Method::GET, "/generate_204") => {
769                Box::pin(async move { serve_no_content_handler(req, Response::builder()) })
770            }
771            _ => {
772                // Return 404 not found response.
773                let r = Response::builder()
774                    .status(StatusCode::NOT_FOUND)
775                    .body(NOTFOUND.into())
776                    .map_err(|err| Box::new(err) as HyperError);
777                Box::pin(async move { r })
778            }
779        }
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use std::{net::Ipv4Addr, sync::Arc, time::Duration};
786
787    use http::StatusCode;
788    use iroh_base::{EndpointId, RelayUrl, SecretKey};
789    use n0_error::Result;
790    use n0_future::{FutureExt, SinkExt, StreamExt};
791    use n0_tracing_test::traced_test;
792    use rand::{RngExt, SeedableRng};
793    use tracing::{info, instrument};
794
795    use super::{
796        Access, AccessConfig, NO_CONTENT_CHALLENGE_HEADER, NO_CONTENT_RESPONSE_HEADER, RelayConfig,
797        Server, ServerConfig, SpawnError,
798    };
799    use crate::{
800        client::{ClientBuilder, ConnectError},
801        dns::DnsResolver,
802        protos::{
803            handshake,
804            relay::{ClientToRelayMsg, Datagrams, RelayToClientMsg},
805        },
806        tls::{CaRootsConfig, default_provider},
807    };
808
809    async fn spawn_local_relay() -> std::result::Result<Server, SpawnError> {
810        Server::spawn(ServerConfig::<(), ()> {
811            relay: Some(RelayConfig::<(), ()> {
812                http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
813                tls: None,
814                limits: Default::default(),
815                key_cache_capacity: Some(1024),
816                access: AccessConfig::Everyone,
817            }),
818            quic: None,
819            metrics_addr: None,
820        })
821        .await
822    }
823
824    #[instrument]
825    async fn try_send_recv(
826        client_a: &mut crate::client::Client,
827        client_b: &mut crate::client::Client,
828        b_key: EndpointId,
829        msg: Datagrams,
830    ) -> Result<RelayToClientMsg> {
831        // try resend 10 times
832        for _ in 0..10 {
833            client_a
834                .send(ClientToRelayMsg::Datagrams {
835                    dst_endpoint_id: b_key,
836                    datagrams: msg.clone(),
837                })
838                .await?;
839            let Ok(res) = tokio::time::timeout(Duration::from_millis(500), client_b.next()).await
840            else {
841                continue;
842            };
843            let res = res.expect("stream finished")?;
844            return Ok(res);
845        }
846        panic!("failed to send and recv message");
847    }
848
849    fn dns_resolver() -> DnsResolver {
850        DnsResolver::new()
851    }
852
853    fn ring_config() -> rustls::ClientConfig {
854        rustls::ClientConfig::builder_with_provider(Arc::new(
855            rustls::crypto::ring::default_provider(),
856        ))
857        .with_safe_default_protocol_versions()
858        .unwrap()
859        .with_root_certificates(rustls::RootCertStore::empty())
860        .with_no_client_auth()
861    }
862
863    #[tokio::test]
864    #[traced_test]
865    async fn test_no_services() {
866        let mut server = Server::spawn(ServerConfig::<(), ()>::default())
867            .await
868            .unwrap();
869        let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
870            .await
871            .expect("timeout, server not finished")
872            .expect("server task JoinError");
873        assert!(res.is_err());
874    }
875
876    #[tokio::test]
877    #[traced_test]
878    async fn test_conflicting_bind() {
879        let mut server = Server::spawn(ServerConfig::<(), ()> {
880            relay: Some(RelayConfig {
881                http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(),
882                tls: None,
883                limits: Default::default(),
884                key_cache_capacity: Some(1024),
885                access: AccessConfig::Everyone,
886            }),
887            quic: None,
888            metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()),
889        })
890        .await
891        .unwrap();
892        let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
893            .await
894            .expect("timeout, server not finished")
895            .expect("server task JoinError");
896        assert!(res.is_err()); // AddrInUse
897    }
898
899    #[tokio::test]
900    #[traced_test]
901    async fn test_root_handler() {
902        let server = spawn_local_relay().await.unwrap();
903        let url = format!("http://{}", server.http_addr().unwrap());
904
905        let client = reqwest::Client::builder()
906            .use_preconfigured_tls(ring_config())
907            .build()
908            .unwrap();
909        let response = client.get(&url).send().await.unwrap();
910        assert_eq!(response.status(), 200);
911        let body = response.text().await.unwrap();
912        assert!(body.contains("iroh.computer"));
913    }
914
915    #[tokio::test]
916    #[traced_test]
917    async fn test_captive_portal_service() {
918        let server = spawn_local_relay().await.unwrap();
919        let url = format!("http://{}/generate_204", server.http_addr().unwrap());
920        let challenge = "123az__.";
921
922        let client = reqwest::Client::builder()
923            .use_preconfigured_tls(ring_config())
924            .build()
925            .unwrap();
926        let response = client
927            .get(&url)
928            .header(NO_CONTENT_CHALLENGE_HEADER, challenge)
929            .send()
930            .await
931            .unwrap();
932        assert_eq!(response.status(), StatusCode::NO_CONTENT);
933        let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap();
934        assert_eq!(header.to_str().unwrap(), format!("response {challenge}"));
935        let body = response.text().await.unwrap();
936        assert!(body.is_empty());
937    }
938
939    #[tokio::test]
940    #[traced_test]
941    async fn test_relay_clients() -> Result<()> {
942        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
943        let server = spawn_local_relay().await?;
944
945        let relay_url = format!("http://{}", server.http_addr().unwrap());
946        let relay_url: RelayUrl = relay_url.parse()?;
947
948        let client_config = CaRootsConfig::default()
949            .client_config(default_provider())
950            .unwrap();
951
952        // set up client a
953        let a_secret_key = SecretKey::from_bytes(&rng.random());
954        let a_key = a_secret_key.public();
955        let resolver = dns_resolver();
956        info!("client a build & connect");
957        let mut client_a = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver.clone())
958            .tls_client_config(client_config.clone())
959            .connect()
960            .await?;
961
962        // set up client b
963        let b_secret_key = SecretKey::from_bytes(&rng.random());
964        let b_key = b_secret_key.public();
965        info!("client b build & connect");
966        let mut client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver.clone())
967            .tls_client_config(client_config)
968            .connect()
969            .await?;
970
971        info!("sending a -> b");
972
973        // send message from a to b
974        let msg = Datagrams::from("hello, b");
975        let res = try_send_recv(&mut client_a, &mut client_b, b_key, msg.clone()).await?;
976        let RelayToClientMsg::Datagrams {
977            remote_endpoint_id,
978            datagrams,
979        } = res
980        else {
981            panic!("client_b received unexpected message {res:?}");
982        };
983
984        assert_eq!(a_key, remote_endpoint_id);
985        assert_eq!(msg, datagrams);
986
987        info!("sending b -> a");
988        // send message from b to a
989        let msg = Datagrams::from("howdy, a");
990        let res = try_send_recv(&mut client_b, &mut client_a, a_key, msg.clone()).await?;
991
992        let RelayToClientMsg::Datagrams {
993            remote_endpoint_id,
994            datagrams,
995        } = res
996        else {
997            panic!("client_a received unexpected message {res:?}");
998        };
999
1000        assert_eq!(b_key, remote_endpoint_id);
1001        assert_eq!(msg, datagrams);
1002
1003        Ok(())
1004    }
1005
1006    #[tokio::test]
1007    #[traced_test]
1008    async fn test_relay_access_control() -> Result<()> {
1009        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1010        let current_span = tracing::info_span!("this is a test");
1011        let _guard = current_span.enter();
1012
1013        let client_config = CaRootsConfig::default()
1014            .client_config(default_provider())
1015            .unwrap();
1016
1017        let a_secret_key = SecretKey::from_bytes(&rng.random());
1018        let a_key = a_secret_key.public();
1019
1020        let server = Server::spawn(ServerConfig::<(), ()> {
1021            relay: Some(RelayConfig::<(), ()> {
1022                http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
1023                tls: None,
1024                limits: Default::default(),
1025                key_cache_capacity: Some(1024),
1026                access: AccessConfig::Restricted(Box::new(move |endpoint_id| {
1027                    async move {
1028                        info!("checking {}", endpoint_id);
1029                        // reject endpoint a
1030                        if endpoint_id == a_key {
1031                            Access::Deny
1032                        } else {
1033                            Access::Allow
1034                        }
1035                    }
1036                    .boxed()
1037                })),
1038            }),
1039            quic: None,
1040            metrics_addr: None,
1041        })
1042        .await?;
1043
1044        let relay_url = format!("http://{}", server.http_addr().unwrap());
1045        let relay_url: RelayUrl = relay_url.parse()?;
1046
1047        // set up client a
1048        let resolver = dns_resolver();
1049        let result = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver)
1050            .tls_client_config(client_config.clone())
1051            .connect()
1052            .await;
1053
1054        assert!(
1055            matches!(result, Err(ConnectError::Handshake { source: handshake::Error::ServerDeniedAuth { reason, .. }, .. }) if reason == "not authorized")
1056        );
1057
1058        // test that another client has access
1059
1060        // set up client b
1061        let b_secret_key = SecretKey::from_bytes(&rng.random());
1062        let b_key = b_secret_key.public();
1063
1064        let resolver = dns_resolver();
1065        let mut client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver)
1066            .tls_client_config(client_config.clone())
1067            .connect()
1068            .await?;
1069
1070        // set up client c
1071        let c_secret_key = SecretKey::from_bytes(&rng.random());
1072        let c_key = c_secret_key.public();
1073
1074        let resolver = dns_resolver();
1075        let mut client_c = ClientBuilder::new(relay_url.clone(), c_secret_key, resolver)
1076            .tls_client_config(client_config)
1077            .connect()
1078            .await?;
1079
1080        // send message from b to c
1081        let msg = Datagrams::from("hello, c");
1082        let res = try_send_recv(&mut client_b, &mut client_c, c_key, msg.clone()).await?;
1083
1084        if let RelayToClientMsg::Datagrams {
1085            remote_endpoint_id,
1086            datagrams,
1087        } = res
1088        {
1089            assert_eq!(b_key, remote_endpoint_id);
1090            assert_eq!(msg, datagrams);
1091        } else {
1092            panic!("client_c received unexpected message {res:?}");
1093        }
1094
1095        Ok(())
1096    }
1097
1098    #[tokio::test]
1099    #[traced_test]
1100    async fn test_relay_clients_full() -> Result<()> {
1101        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1102        let server = spawn_local_relay().await.unwrap();
1103        let relay_url = format!("http://{}", server.http_addr().unwrap());
1104        let relay_url: RelayUrl = relay_url.parse().unwrap();
1105
1106        let client_config = CaRootsConfig::default()
1107            .client_config(default_provider())
1108            .unwrap();
1109
1110        // set up client a
1111        let a_secret_key = SecretKey::from_bytes(&rng.random());
1112        let resolver = dns_resolver();
1113        let mut client_a = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver.clone())
1114            .tls_client_config(client_config.clone())
1115            .connect()
1116            .await?;
1117
1118        // set up client b
1119        let b_secret_key = SecretKey::from_bytes(&rng.random());
1120        let b_key = b_secret_key.public();
1121        let _client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver.clone())
1122            .tls_client_config(client_config)
1123            .connect()
1124            .await?;
1125
1126        // send messages from a to b, without b receiving anything.
1127        // we should still keep succeeding to send, even if the packet won't be forwarded
1128        // by the relay server because the server's send queue for b fills up.
1129        let msg = Datagrams::from("hello, b");
1130        for _i in 0..1000 {
1131            client_a
1132                .send(ClientToRelayMsg::Datagrams {
1133                    dst_endpoint_id: b_key,
1134                    datagrams: msg.clone(),
1135                })
1136                .await?;
1137        }
1138        Ok(())
1139    }
1140}