1use std::{fmt, future::Future, net::SocketAddr, num::NonZeroU32, pin::Pin, sync::Arc};
19
20use derive_more::Debug;
21use http::{
22 HeaderMap, HeaderValue, Method, Request, Response, StatusCode, header::InvalidHeaderValue,
23 response::Builder as ResponseBuilder,
24};
25use hyper::body::Incoming;
26use iroh_base::EndpointId;
27#[cfg(feature = "test-utils")]
28use iroh_base::RelayUrl;
29use n0_error::{e, stack_error};
30use n0_future::{StreamExt, future::Boxed};
31use tokio::{
32 net::TcpListener,
33 task::{JoinError, JoinSet},
34};
35use tokio_util::task::AbortOnDropHandle;
36use tracing::{Instrument, debug, error, info, info_span, instrument};
37
38use crate::{
39 defaults::DEFAULT_KEY_CACHE_CAPACITY,
40 http::RELAY_PROBE_PATH,
41 quic::server::{QuicServer, QuicSpawnError, ServerHandle as QuicServerHandle},
42};
43
44mod client;
45mod clients;
46mod http_server;
47mod metrics;
48pub(crate) mod resolver;
49pub(crate) mod streams;
50#[cfg(feature = "test-utils")]
51pub mod testing;
52
53pub use self::{
54 metrics::{Metrics, RelayMetrics},
55 resolver::{DEFAULT_CERT_RELOAD_INTERVAL, ReloadingResolver},
56};
57
58const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Iroh-Challenge";
59const NO_CONTENT_RESPONSE_HEADER: &str = "X-Iroh-Response";
60const NOTFOUND: &[u8] = b"Not Found";
61const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n";
62const INDEX: &[u8] = br#"<html><body>
63<h1>Iroh Relay</h1>
64<p>
65 This is an <a href="https://iroh.computer/">Iroh</a> Relay server.
66</p>
67"#;
68const TLS_HEADERS: [(&str, &str); 2] = [
69 (
70 "Strict-Transport-Security",
71 "max-age=63072000; includeSubDomains",
72 ),
73 (
74 "Content-Security-Policy",
75 "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'",
76 ),
77];
78
79type BytesBody = http_body_util::Full<hyper::body::Bytes>;
80type HyperError = Box<dyn std::error::Error + Send + Sync>;
81type HyperResult<T> = std::result::Result<T, HyperError>;
82
83fn body_empty() -> BytesBody {
85 http_body_util::Full::new(hyper::body::Bytes::new())
86}
87
88#[derive(Debug, Default)]
93pub struct ServerConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
94 pub relay: Option<RelayConfig<EC, EA>>,
96 pub quic: Option<QuicConfig>,
98 #[cfg(feature = "metrics")]
100 pub metrics_addr: Option<SocketAddr>,
101}
102
103#[derive(Debug)]
108pub struct RelayConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
109 pub http_bind_addr: SocketAddr,
117 pub tls: Option<TlsConfig<EC, EA>>,
122 pub limits: Limits,
124 pub key_cache_capacity: Option<usize>,
126 pub access: AccessConfig,
128}
129
130#[derive(derive_more::Debug)]
132pub enum AccessConfig {
133 Everyone,
135 #[debug("restricted")]
137 Restricted(Box<dyn Fn(EndpointId) -> Boxed<Access> + Send + Sync + 'static>),
138}
139
140impl AccessConfig {
141 pub async fn is_allowed(&self, endpoint: EndpointId) -> bool {
143 match self {
144 Self::Everyone => true,
145 Self::Restricted(check) => {
146 let res = check(endpoint).await;
147 matches!(res, Access::Allow)
148 }
149 }
150 }
151}
152
153#[derive(Debug, Copy, Clone, PartialEq, Eq)]
155pub enum Access {
156 Allow,
158 Deny,
160}
161
162#[derive(Debug)]
164pub struct QuicConfig {
165 pub bind_addr: SocketAddr,
169 pub server_config: rustls::ServerConfig,
174}
175
176#[derive(Debug)]
180pub struct TlsConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
181 pub https_bind_addr: SocketAddr,
189 pub quic_bind_addr: SocketAddr,
191 pub cert: CertConfig<EC, EA>,
193 pub server_config: rustls::ServerConfig,
195}
196
197#[derive(Debug, Default)]
200pub struct Limits {
201 pub accept_conn_limit: Option<f64>,
203 pub accept_conn_burst: Option<usize>,
205 pub client_rx: Option<ClientRateLimit>,
207}
208
209#[derive(Debug, Copy, Clone)]
211pub struct ClientRateLimit {
212 pub bytes_per_second: NonZeroU32,
214 pub max_burst_bytes: Option<NonZeroU32>,
216}
217
218#[derive(derive_more::Debug)]
220pub enum CertConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
221 LetsEncrypt {
223 #[debug("AcmeConfig")]
225 state: tokio_rustls_acme::AcmeState<EC, EA>,
226 },
227 Manual {
229 certs: Vec<rustls::pki_types::CertificateDer<'static>>,
231 },
232 Reloading,
234}
235
236#[derive(Debug)]
242pub struct Server {
243 http_addr: Option<SocketAddr>,
245 https_addr: Option<SocketAddr>,
250 quic_addr: Option<SocketAddr>,
252 relay_handle: Option<http_server::ServerHandle>,
254 quic_handle: Option<QuicServerHandle>,
256 supervisor: AbortOnDropHandle<Result<(), SupervisorError>>,
258 certificates: Option<Vec<rustls::pki_types::CertificateDer<'static>>>,
263 metrics: RelayMetrics,
264}
265
266#[allow(missing_docs)]
268#[stack_error(derive, add_meta, std_sources)]
269#[non_exhaustive]
270pub enum SpawnError {
271 #[error("Unable to get local address")]
272 LocalAddr { source: std::io::Error },
273 #[error("Failed to bind QAD listener")]
274 QuicSpawn { source: QuicSpawnError },
275 #[error("Failed to parse TLS header")]
276 TlsHeaderParse { source: InvalidHeaderValue },
277 #[error("Failed to bind TcpListener")]
278 BindTlsListener { source: std::io::Error },
279 #[error("No local address")]
280 NoLocalAddr { source: std::io::Error },
281 #[error("Failed to bind server socket to {addr}")]
282 BindTcpListener {
283 source: std::io::Error,
284 addr: SocketAddr,
285 },
286}
287
288#[allow(missing_docs)]
290#[stack_error(derive, add_meta)]
291#[non_exhaustive]
292pub enum SupervisorError {
293 #[error("Error starting metrics server")]
294 Metrics {
295 #[error(std_err)]
296 source: std::io::Error,
297 },
298 #[error("Acme event stream finished")]
299 AcmeEventStreamFinished {},
300 #[error(transparent)]
301 JoinError {
302 #[error(from, std_err)]
303 source: JoinError,
304 },
305 #[error("No relay services are enabled")]
306 NoRelayServicesEnabled {},
307 #[error("Task cancelled")]
308 TaskCancelled {},
309}
310
311impl Server {
312 pub async fn spawn<EC, EA>(config: ServerConfig<EC, EA>) -> Result<Self, SpawnError>
314 where
315 EC: fmt::Debug + 'static,
316 EA: fmt::Debug + 'static,
317 {
318 let mut tasks = JoinSet::new();
319
320 let metrics = RelayMetrics::default();
321
322 #[cfg(feature = "metrics")]
323 if let Some(addr) = config.metrics_addr {
324 debug!("Starting metrics server");
325 let mut registry = iroh_metrics::Registry::default();
326 registry.register_all(&metrics);
327 tasks.spawn(
328 async move {
329 iroh_metrics::service::start_metrics_server(addr, Arc::new(registry))
330 .await
331 .map_err(|err| e!(SupervisorError::Metrics, err))
332 }
333 .instrument(info_span!("metrics-server")),
334 );
335 }
336
337 let certificates = config.relay.as_ref().and_then(|relay| {
339 relay.tls.as_ref().and_then(|tls| match tls.cert {
340 CertConfig::LetsEncrypt { .. } => None,
341 CertConfig::Manual { ref certs, .. } => Some(certs.clone()),
342 CertConfig::Reloading => None,
343 })
344 });
345
346 let quic_server = match config.quic {
347 Some(quic_config) => {
348 debug!("Starting QUIC server {}", quic_config.bind_addr);
349 Some(QuicServer::spawn(quic_config).map_err(|err| e!(SpawnError::QuicSpawn, err))?)
350 }
351 None => None,
352 };
353 let quic_addr = quic_server.as_ref().map(|srv| srv.bind_addr());
354 let quic_handle = quic_server.as_ref().map(|srv| srv.handle());
355
356 let (relay_server, http_addr) = match config.relay {
357 Some(relay_config) => {
358 debug!("Starting Relay server");
359 let mut headers = HeaderMap::new();
360 for (name, value) in TLS_HEADERS.iter() {
361 headers.insert(
362 *name,
363 value
364 .parse()
365 .map_err(|err| e!(SpawnError::TlsHeaderParse, err))?,
366 );
367 }
368 let relay_bind_addr = match relay_config.tls {
369 Some(ref tls) => tls.https_bind_addr,
370 None => relay_config.http_bind_addr,
371 };
372 let key_cache_capacity = relay_config
373 .key_cache_capacity
374 .unwrap_or(DEFAULT_KEY_CACHE_CAPACITY);
375 let mut builder = http_server::ServerBuilder::new(relay_bind_addr)
376 .metrics(metrics.server.clone())
377 .headers(headers)
378 .key_cache_capacity(key_cache_capacity)
379 .access(relay_config.access)
380 .request_handler(Method::GET, "/", Box::new(root_handler))
381 .request_handler(Method::GET, "/index.html", Box::new(root_handler))
382 .request_handler(Method::GET, RELAY_PROBE_PATH, Box::new(probe_handler))
383 .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler));
384 if let Some(cfg) = relay_config.limits.client_rx {
385 builder = builder.client_rx_ratelimit(cfg);
386 }
387 let http_addr = match relay_config.tls {
388 Some(tls_config) => {
389 let server_tls_config = match tls_config.cert {
390 CertConfig::LetsEncrypt { mut state } => {
391 let acceptor =
392 http_server::TlsAcceptor::LetsEncrypt(state.acceptor());
393 tasks.spawn(
394 async move {
395 while let Some(event) = state.next().await {
396 match event {
397 Ok(ok) => debug!("acme event: {ok:?}"),
398 Err(err) => error!("error: {err:?}"),
399 }
400 }
401 Err(e!(SupervisorError::AcmeEventStreamFinished))
402 }
403 .instrument(info_span!("acme")),
404 );
405 Some(http_server::TlsConfig {
406 config: Arc::new(tls_config.server_config),
407 acceptor,
408 })
409 }
410 CertConfig::Manual { .. } | CertConfig::Reloading => {
411 let server_config = Arc::new(tls_config.server_config);
412 let acceptor =
413 tokio_rustls::TlsAcceptor::from(server_config.clone());
414 let acceptor = http_server::TlsAcceptor::Manual(acceptor);
415 Some(http_server::TlsConfig {
416 config: server_config,
417 acceptor,
418 })
419 }
420 };
421 builder = builder.tls_config(server_tls_config);
422
423 let http_listener = TcpListener::bind(&relay_config.http_bind_addr)
426 .await
427 .map_err(|err| e!(SpawnError::BindTlsListener, err))?;
428 let http_addr = http_listener
429 .local_addr()
430 .map_err(|err| e!(SpawnError::NoLocalAddr, err))?;
431 tasks.spawn(
432 async move {
433 run_captive_portal_service(http_listener).await;
434 Ok(())
435 }
436 .instrument(info_span!("http-service", addr = %http_addr)),
437 );
438 Some(http_addr)
439 }
440 None => {
441 builder = builder.request_handler(
444 Method::GET,
445 "/generate_204",
446 Box::new(serve_no_content_handler),
447 );
448 None
449 }
450 };
451 let relay_server = builder.spawn().await?;
452 (Some(relay_server), http_addr)
453 }
454 None => (None, None),
455 };
456 let relay_addr = relay_server.as_ref().map(|srv| srv.addr());
459 let relay_handle = relay_server.as_ref().map(|srv| srv.handle());
460 let task = tokio::spawn(relay_supervisor(tasks, relay_server, quic_server));
461
462 Ok(Self {
463 http_addr: http_addr.or(relay_addr),
464 https_addr: http_addr.and(relay_addr),
465 quic_addr,
466 relay_handle,
467 quic_handle,
468 supervisor: AbortOnDropHandle::new(task),
469 certificates,
470 metrics,
471 })
472 }
473
474 pub async fn shutdown(self) -> Result<(), SupervisorError> {
478 if let Some(handle) = self.relay_handle {
481 handle.shutdown();
482 }
483 if let Some(handle) = self.quic_handle {
484 handle.shutdown();
485 }
486 self.supervisor.await?
487 }
488
489 pub fn task_handle(&mut self) -> &mut AbortOnDropHandle<Result<(), SupervisorError>> {
494 &mut self.supervisor
495 }
496
497 pub fn https_addr(&self) -> Option<SocketAddr> {
499 self.https_addr
500 }
501
502 pub fn http_addr(&self) -> Option<SocketAddr> {
504 self.http_addr
505 }
506
507 pub fn quic_addr(&self) -> Option<SocketAddr> {
509 self.quic_addr
510 }
511
512 pub fn certificates(&self) -> Option<Vec<rustls::pki_types::CertificateDer<'static>>> {
514 self.certificates.clone()
515 }
516
517 #[cfg(feature = "test-utils")]
521 pub fn https_url(&self) -> Option<RelayUrl> {
522 self.https_addr.map(|addr| {
523 url::Url::parse(&format!("https://{addr}"))
524 .expect("valid url")
525 .into()
526 })
527 }
528
529 #[cfg(feature = "test-utils")]
533 pub fn http_url(&self) -> Option<RelayUrl> {
534 self.http_addr.map(|addr| {
535 url::Url::parse(&format!("http://{addr}"))
536 .expect("valid url")
537 .into()
538 })
539 }
540
541 pub fn metrics(&self) -> &RelayMetrics {
543 &self.metrics
544 }
545}
546
547#[instrument(skip_all)]
552async fn relay_supervisor(
553 mut tasks: JoinSet<Result<(), SupervisorError>>,
554 mut relay_http_server: Option<http_server::Server>,
555 mut quic_server: Option<QuicServer>,
556) -> Result<(), SupervisorError> {
557 let quic_enabled = quic_server.is_some();
558 let mut quic_fut = match quic_server {
559 Some(ref mut server) => n0_future::Either::Left(server.task_handle()),
560 None => n0_future::Either::Right(n0_future::future::pending()),
561 };
562 let relay_enabled = relay_http_server.is_some();
563 let mut relay_fut = match relay_http_server {
564 Some(ref mut server) => n0_future::Either::Left(server.task_handle()),
565 None => n0_future::Either::Right(n0_future::future::pending()),
566 };
567 let res = tokio::select! {
568 biased;
569 Some(ret) = tasks.join_next() => ret,
570 ret = &mut quic_fut, if quic_enabled => ret.map(Ok),
571 ret = &mut relay_fut, if relay_enabled => ret.map(Ok),
572 else => Ok(Err(e!(SupervisorError::NoRelayServicesEnabled))),
573 };
574 let ret = match res {
575 Ok(Ok(())) => {
576 debug!("Task exited");
577 Ok(())
578 }
579 Ok(Err(err)) => {
580 error!(%err, "Task failed");
581 Err(err)
582 }
583 Err(err) => {
584 if let Ok(panic) = err.try_into_panic() {
585 error!("Task panicked");
586 std::panic::resume_unwind(panic);
587 }
588 debug!("Task cancelled");
589 Err(e!(SupervisorError::TaskCancelled))
590 }
591 };
592
593 if let Some(server) = relay_http_server {
596 server.shutdown();
597 }
598
599 if let Some(server) = quic_server {
601 server.shutdown().await;
602 }
603
604 tasks.shutdown().await;
606
607 ret
608}
609
610fn root_handler(
611 _r: Request<Incoming>,
612 response: ResponseBuilder,
613) -> HyperResult<Response<BytesBody>> {
614 response
615 .status(StatusCode::OK)
616 .header("Content-Type", "text/html; charset=utf-8")
617 .body(INDEX.into())
618 .map_err(|err| Box::new(err) as HyperError)
619}
620
621fn probe_handler(
623 _r: Request<Incoming>,
624 response: ResponseBuilder,
625) -> HyperResult<Response<BytesBody>> {
626 response
627 .status(StatusCode::OK)
628 .header("Access-Control-Allow-Origin", "*")
629 .body(body_empty())
630 .map_err(|err| Box::new(err) as HyperError)
631}
632
633fn robots_handler(
634 _r: Request<Incoming>,
635 response: ResponseBuilder,
636) -> HyperResult<Response<BytesBody>> {
637 response
638 .status(StatusCode::OK)
639 .body(ROBOTS_TXT.into())
640 .map_err(|err| Box::new(err) as HyperError)
641}
642
643fn serve_no_content_handler<B: hyper::body::Body>(
645 r: Request<B>,
646 mut response: ResponseBuilder,
647) -> HyperResult<Response<BytesBody>> {
648 let check = |c: &HeaderValue| {
649 !c.is_empty() && c.len() < 64 && c.as_bytes().iter().all(|c| is_challenge_char(*c as char))
650 };
651
652 if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) {
653 if check(challenge) {
654 response = response.header(
655 NO_CONTENT_RESPONSE_HEADER,
656 format!("response {}", challenge.to_str()?),
657 );
658 }
659 }
660
661 response
662 .status(StatusCode::NO_CONTENT)
663 .body(body_empty())
664 .map_err(|err| Box::new(err) as HyperError)
665}
666
667fn is_challenge_char(c: char) -> bool {
668 c.is_ascii_lowercase()
670 || c.is_ascii_uppercase()
671 || c.is_ascii_digit()
672 || c == '.'
673 || c == '-'
674 || c == '_'
675}
676
677async fn run_captive_portal_service(http_listener: TcpListener) {
679 info!("serving");
680
681 let mut tasks = JoinSet::new();
683
684 loop {
685 tokio::select! {
686 biased;
687
688 Some(res) = tasks.join_next() => {
689 if let Err(err) = res {
690 if err.is_panic() {
691 panic!("task panicked: {err:#?}");
692 }
693 }
694 }
695
696 res = http_listener.accept() => {
697 match res {
698 Ok((stream, peer_addr)) => {
699 debug!(%peer_addr, "Connection opened",);
700 let handler = CaptivePortalService;
701
702 tasks.spawn(async move {
703 let stream = crate::server::streams::MaybeTlsStream::Plain(stream);
704 let stream = hyper_util::rt::TokioIo::new(stream);
705 if let Err(err) = hyper::server::conn::http1::Builder::new()
706 .serve_connection(stream, handler)
707 .with_upgrades()
708 .await
709 {
710 error!("Failed to serve connection: {err:?}");
711 }
712 });
713 }
714 Err(err) => {
715 error!(
716 "[CaptivePortalService] failed to accept connection: {:#?}",
717 err
718 );
719 }
720 }
721 }
722 }
723 }
724}
725
726#[derive(Clone)]
727struct CaptivePortalService;
728
729impl hyper::service::Service<Request<Incoming>> for CaptivePortalService {
730 type Response = Response<BytesBody>;
731 type Error = HyperError;
732 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
733
734 fn call(&self, req: Request<Incoming>) -> Self::Future {
735 match (req.method(), req.uri().path()) {
736 (&Method::GET, "/generate_204") => {
738 Box::pin(async move { serve_no_content_handler(req, Response::builder()) })
739 }
740 _ => {
741 let r = Response::builder()
743 .status(StatusCode::NOT_FOUND)
744 .body(NOTFOUND.into())
745 .map_err(|err| Box::new(err) as HyperError);
746 Box::pin(async move { r })
747 }
748 }
749 }
750}
751
752#[cfg(test)]
753mod tests {
754 use std::{net::Ipv4Addr, time::Duration};
755
756 use http::StatusCode;
757 use iroh_base::{EndpointId, RelayUrl, SecretKey};
758 use n0_error::Result;
759 use n0_future::{FutureExt, SinkExt, StreamExt};
760 use rand::SeedableRng;
761 use tracing::{info, instrument};
762 use tracing_test::traced_test;
763
764 use super::{
765 Access, AccessConfig, NO_CONTENT_CHALLENGE_HEADER, NO_CONTENT_RESPONSE_HEADER, RelayConfig,
766 Server, ServerConfig, SpawnError,
767 };
768 use crate::{
769 client::{ClientBuilder, ConnectError},
770 dns::DnsResolver,
771 protos::{
772 handshake,
773 relay::{ClientToRelayMsg, Datagrams, RelayToClientMsg},
774 },
775 };
776
777 async fn spawn_local_relay() -> std::result::Result<Server, SpawnError> {
778 Server::spawn(ServerConfig::<(), ()> {
779 relay: Some(RelayConfig::<(), ()> {
780 http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
781 tls: None,
782 limits: Default::default(),
783 key_cache_capacity: Some(1024),
784 access: AccessConfig::Everyone,
785 }),
786 quic: None,
787 metrics_addr: None,
788 })
789 .await
790 }
791
792 #[instrument]
793 async fn try_send_recv(
794 client_a: &mut crate::client::Client,
795 client_b: &mut crate::client::Client,
796 b_key: EndpointId,
797 msg: Datagrams,
798 ) -> Result<RelayToClientMsg> {
799 for _ in 0..10 {
801 client_a
802 .send(ClientToRelayMsg::Datagrams {
803 dst_endpoint_id: b_key,
804 datagrams: msg.clone(),
805 })
806 .await?;
807 let Ok(res) = tokio::time::timeout(Duration::from_millis(500), client_b.next()).await
808 else {
809 continue;
810 };
811 let res = res.expect("stream finished")?;
812 return Ok(res);
813 }
814 panic!("failed to send and recv message");
815 }
816
817 fn dns_resolver() -> DnsResolver {
818 DnsResolver::new()
819 }
820
821 #[tokio::test]
822 #[traced_test]
823 async fn test_no_services() {
824 let mut server = Server::spawn(ServerConfig::<(), ()>::default())
825 .await
826 .unwrap();
827 let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
828 .await
829 .expect("timeout, server not finished")
830 .expect("server task JoinError");
831 assert!(res.is_err());
832 }
833
834 #[tokio::test]
835 #[traced_test]
836 async fn test_conflicting_bind() {
837 let mut server = Server::spawn(ServerConfig::<(), ()> {
838 relay: Some(RelayConfig {
839 http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(),
840 tls: None,
841 limits: Default::default(),
842 key_cache_capacity: Some(1024),
843 access: AccessConfig::Everyone,
844 }),
845 quic: None,
846 metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()),
847 })
848 .await
849 .unwrap();
850 let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
851 .await
852 .expect("timeout, server not finished")
853 .expect("server task JoinError");
854 assert!(res.is_err()); }
856
857 #[tokio::test]
858 #[traced_test]
859 async fn test_root_handler() {
860 let server = spawn_local_relay().await.unwrap();
861 let url = format!("http://{}", server.http_addr().unwrap());
862
863 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
864 let response = client.get(&url).send().await.unwrap();
865 assert_eq!(response.status(), 200);
866 let body = response.text().await.unwrap();
867 assert!(body.contains("iroh.computer"));
868 }
869
870 #[tokio::test]
871 #[traced_test]
872 async fn test_captive_portal_service() {
873 let server = spawn_local_relay().await.unwrap();
874 let url = format!("http://{}/generate_204", server.http_addr().unwrap());
875 let challenge = "123az__.";
876
877 let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
878 let response = client
879 .get(&url)
880 .header(NO_CONTENT_CHALLENGE_HEADER, challenge)
881 .send()
882 .await
883 .unwrap();
884 assert_eq!(response.status(), StatusCode::NO_CONTENT);
885 let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap();
886 assert_eq!(header.to_str().unwrap(), format!("response {challenge}"));
887 let body = response.text().await.unwrap();
888 assert!(body.is_empty());
889 }
890
891 #[tokio::test]
892 #[traced_test]
893 async fn test_relay_clients() -> Result<()> {
894 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
895 let server = spawn_local_relay().await?;
896
897 let relay_url = format!("http://{}", server.http_addr().unwrap());
898 let relay_url: RelayUrl = relay_url.parse()?;
899
900 let a_secret_key = SecretKey::generate(&mut rng);
902 let a_key = a_secret_key.public();
903 let resolver = dns_resolver();
904 info!("client a build & connect");
905 let mut client_a = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver.clone())
906 .connect()
907 .await?;
908
909 let b_secret_key = SecretKey::generate(&mut rng);
911 let b_key = b_secret_key.public();
912 info!("client b build & connect");
913 let mut client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver.clone())
914 .connect()
915 .await?;
916
917 info!("sending a -> b");
918
919 let msg = Datagrams::from("hello, b");
921 let res = try_send_recv(&mut client_a, &mut client_b, b_key, msg.clone()).await?;
922 let RelayToClientMsg::Datagrams {
923 remote_endpoint_id,
924 datagrams,
925 } = res
926 else {
927 panic!("client_b received unexpected message {res:?}");
928 };
929
930 assert_eq!(a_key, remote_endpoint_id);
931 assert_eq!(msg, datagrams);
932
933 info!("sending b -> a");
934 let msg = Datagrams::from("howdy, a");
936 let res = try_send_recv(&mut client_b, &mut client_a, a_key, msg.clone()).await?;
937
938 let RelayToClientMsg::Datagrams {
939 remote_endpoint_id,
940 datagrams,
941 } = res
942 else {
943 panic!("client_a received unexpected message {res:?}");
944 };
945
946 assert_eq!(b_key, remote_endpoint_id);
947 assert_eq!(msg, datagrams);
948
949 Ok(())
950 }
951
952 #[tokio::test]
953 #[traced_test]
954 async fn test_relay_access_control() -> Result<()> {
955 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
956 let current_span = tracing::info_span!("this is a test");
957 let _guard = current_span.enter();
958
959 let a_secret_key = SecretKey::generate(&mut rng);
960 let a_key = a_secret_key.public();
961
962 let server = Server::spawn(ServerConfig::<(), ()> {
963 relay: Some(RelayConfig::<(), ()> {
964 http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
965 tls: None,
966 limits: Default::default(),
967 key_cache_capacity: Some(1024),
968 access: AccessConfig::Restricted(Box::new(move |endpoint_id| {
969 async move {
970 info!("checking {}", endpoint_id);
971 if endpoint_id == a_key {
973 Access::Deny
974 } else {
975 Access::Allow
976 }
977 }
978 .boxed()
979 })),
980 }),
981 quic: None,
982 metrics_addr: None,
983 })
984 .await?;
985
986 let relay_url = format!("http://{}", server.http_addr().unwrap());
987 let relay_url: RelayUrl = relay_url.parse()?;
988
989 let resolver = dns_resolver();
991 let result = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver)
992 .connect()
993 .await;
994
995 assert!(
996 matches!(result, Err(ConnectError::Handshake { source: handshake::Error::ServerDeniedAuth { reason, .. }, .. }) if reason == "not authorized")
997 );
998
999 let b_secret_key = SecretKey::generate(&mut rng);
1003 let b_key = b_secret_key.public();
1004
1005 let resolver = dns_resolver();
1006 let mut client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver)
1007 .connect()
1008 .await?;
1009
1010 let c_secret_key = SecretKey::generate(&mut rng);
1012 let c_key = c_secret_key.public();
1013
1014 let resolver = dns_resolver();
1015 let mut client_c = ClientBuilder::new(relay_url.clone(), c_secret_key, resolver)
1016 .connect()
1017 .await?;
1018
1019 let msg = Datagrams::from("hello, c");
1021 let res = try_send_recv(&mut client_b, &mut client_c, c_key, msg.clone()).await?;
1022
1023 if let RelayToClientMsg::Datagrams {
1024 remote_endpoint_id,
1025 datagrams,
1026 } = res
1027 {
1028 assert_eq!(b_key, remote_endpoint_id);
1029 assert_eq!(msg, datagrams);
1030 } else {
1031 panic!("client_c received unexpected message {res:?}");
1032 }
1033
1034 Ok(())
1035 }
1036
1037 #[tokio::test]
1038 #[traced_test]
1039 async fn test_relay_clients_full() -> Result<()> {
1040 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1041 let server = spawn_local_relay().await.unwrap();
1042 let relay_url = format!("http://{}", server.http_addr().unwrap());
1043 let relay_url: RelayUrl = relay_url.parse().unwrap();
1044
1045 let a_secret_key = SecretKey::generate(&mut rng);
1047 let resolver = dns_resolver();
1048 let mut client_a = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver.clone())
1049 .connect()
1050 .await?;
1051
1052 let b_secret_key = SecretKey::generate(&mut rng);
1054 let b_key = b_secret_key.public();
1055 let _client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver.clone())
1056 .connect()
1057 .await?;
1058
1059 let msg = Datagrams::from("hello, b");
1063 for _i in 0..1000 {
1064 client_a
1065 .send(ClientToRelayMsg::Datagrams {
1066 dst_endpoint_id: b_key,
1067 datagrams: msg.clone(),
1068 })
1069 .await?;
1070 }
1071 Ok(())
1072 }
1073}