1use std::{fmt, future::Future, net::SocketAddr, pin::Pin, sync::Arc};
13
14use anyhow::{anyhow, bail, Context, Result};
15use futures_lite::StreamExt;
16use http::{
17 response::Builder as ResponseBuilder, HeaderMap, Method, Request, Response, StatusCode,
18};
19use hyper::body::Incoming;
20use iroh_metrics::inc;
21use stun_metrics::StunMetrics;
23use tokio::{
24 net::{TcpListener, UdpSocket},
25 task::JoinSet,
26};
27use tokio_util::task::AbortOnDropHandle;
28use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
29
30use crate::{
31 relay::http::{LEGACY_RELAY_PROBE_PATH, RELAY_PROBE_PATH},
32 stun,
33};
34
35pub(crate) mod actor;
36pub(crate) mod client_conn;
37mod clients;
38mod http_server;
39mod metrics;
40pub(crate) mod streams;
41pub(crate) mod types;
42
43pub use self::{
44 actor::{ClientConnHandler, ServerActorTask},
45 metrics::Metrics,
46 streams::MaybeTlsStream as MaybeTlsStreamServer,
47};
48
49const NO_CONTENT_CHALLENGE_HEADER: &str = "X-Tailscale-Challenge";
50const NO_CONTENT_RESPONSE_HEADER: &str = "X-Tailscale-Response";
51const NOTFOUND: &[u8] = b"Not Found";
52const ROBOTS_TXT: &[u8] = b"User-agent: *\nDisallow: /\n";
53const INDEX: &[u8] = br#"<html><body>
54<h1>Iroh Relay</h1>
55<p>
56 This is an <a href="https://iroh.computer/">Iroh</a> Relay server.
57</p>
58"#;
59const TLS_HEADERS: [(&str, &str); 2] = [
60 ("Strict-Transport-Security", "max-age=63072000; includeSubDomains"),
61 ("Content-Security-Policy", "default-src 'none'; frame-ancestors 'none'; form-action 'none'; base-uri 'self'; block-all-mixed-content; plugin-types 'none'")
62];
63
64type BytesBody = http_body_util::Full<hyper::body::Bytes>;
65type HyperError = Box<dyn std::error::Error + Send + Sync>;
66type HyperResult<T> = std::result::Result<T, HyperError>;
67
68fn body_empty() -> BytesBody {
70 http_body_util::Full::new(hyper::body::Bytes::new())
71}
72
73#[derive(Debug, Default)]
78pub struct ServerConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
79 pub relay: Option<RelayConfig<EC, EA>>,
81 pub stun: Option<StunConfig>,
83 #[cfg(feature = "metrics")]
85 #[cfg_attr(iroh_docsrs, doc(cfg(feature = "metrics")))]
86 pub metrics_addr: Option<SocketAddr>,
87}
88
89#[derive(Debug)]
94pub struct RelayConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
95 pub http_bind_addr: SocketAddr,
103 pub tls: Option<TlsConfig<EC, EA>>,
108 pub limits: Limits,
110}
111
112#[derive(Debug)]
114pub struct StunConfig {
115 pub bind_addr: SocketAddr,
119}
120
121#[derive(Debug)]
125pub struct TlsConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
126 pub https_bind_addr: SocketAddr,
134 pub cert: CertConfig<EC, EA>,
136}
137
138#[derive(Debug, Default)]
140pub struct Limits {
141 pub accept_conn_limit: Option<f64>,
143 pub accept_conn_burst: Option<usize>,
145}
146
147#[derive(derive_more::Debug)]
149pub enum CertConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
150 LetsEncrypt {
152 #[debug("AcmeConfig")]
154 config: tokio_rustls_acme::AcmeConfig<EC, EA>,
155 },
156 Manual {
158 private_key: rustls::pki_types::PrivateKeyDer<'static>,
160 certs: Vec<rustls::pki_types::CertificateDer<'static>>,
162 },
163}
164
165#[derive(Debug)]
171pub struct Server {
172 http_addr: Option<SocketAddr>,
174 stun_addr: Option<SocketAddr>,
176 https_addr: Option<SocketAddr>,
181 relay_handle: Option<http_server::ServerHandle>,
183 supervisor: AbortOnDropHandle<Result<()>>,
185}
186
187impl Server {
188 pub async fn spawn<EC, EA>(config: ServerConfig<EC, EA>) -> Result<Self>
190 where
191 EC: fmt::Debug + 'static,
192 EA: fmt::Debug + 'static,
193 {
194 let mut tasks = JoinSet::new();
195
196 #[cfg(feature = "metrics")]
197 if let Some(addr) = config.metrics_addr {
198 debug!("Starting metrics server");
199 use iroh_metrics::core::Metric;
200
201 iroh_metrics::core::Core::init(|reg, metrics| {
202 metrics.insert(crate::metrics::RelayMetrics::new(reg));
203 metrics.insert(StunMetrics::new(reg));
204 });
205 tasks.spawn(
206 iroh_metrics::metrics::start_metrics_server(addr)
207 .instrument(info_span!("metrics-server")),
208 );
209 }
210
211 let stun_addr = match config.stun {
213 Some(stun) => {
214 debug!("Starting STUN server");
215 match UdpSocket::bind(stun.bind_addr).await {
216 Ok(sock) => {
217 let addr = sock.local_addr()?;
218 info!("STUN server bound on {addr}");
219 tasks.spawn(
220 server_stun_listener(sock).instrument(info_span!("stun-server", %addr)),
221 );
222 Some(addr)
223 }
224 Err(err) => bail!("failed to bind STUN listener: {err:#?}"),
225 }
226 }
227 None => None,
228 };
229
230 let (relay_server, http_addr) = match config.relay {
232 Some(relay_config) => {
233 debug!("Starting Relay server");
234 let mut headers = HeaderMap::new();
235 for (name, value) in TLS_HEADERS.iter() {
236 headers.insert(*name, value.parse()?);
237 }
238 let relay_bind_addr = match relay_config.tls {
239 Some(ref tls) => tls.https_bind_addr,
240 None => relay_config.http_bind_addr,
241 };
242 let mut builder = http_server::ServerBuilder::new(relay_bind_addr)
243 .headers(headers)
244 .request_handler(Method::GET, "/", Box::new(root_handler))
245 .request_handler(Method::GET, "/index.html", Box::new(root_handler))
246 .request_handler(
247 Method::GET,
248 LEGACY_RELAY_PROBE_PATH,
249 Box::new(probe_handler),
250 ) .request_handler(Method::GET, RELAY_PROBE_PATH, Box::new(probe_handler))
252 .request_handler(Method::GET, "/robots.txt", Box::new(robots_handler));
253 let http_addr = match relay_config.tls {
254 Some(tls_config) => {
255 let server_config = rustls::ServerConfig::builder_with_provider(Arc::new(
256 rustls::crypto::ring::default_provider(),
257 ))
258 .with_safe_default_protocol_versions()
259 .expect("protocols supported by ring")
260 .with_no_client_auth();
261 let server_tls_config = match tls_config.cert {
262 CertConfig::LetsEncrypt { config } => {
263 let mut state = config.state();
264 let server_config =
265 server_config.with_cert_resolver(state.resolver());
266 let acceptor =
267 http_server::TlsAcceptor::LetsEncrypt(state.acceptor());
268 tasks.spawn(
269 async move {
270 while let Some(event) = state.next().await {
271 match event {
272 Ok(ok) => debug!("acme event: {ok:?}"),
273 Err(err) => error!("error: {err:?}"),
274 }
275 }
276 Err(anyhow!("acme event stream finished"))
277 }
278 .instrument(info_span!("acme")),
279 );
280 Some(http_server::TlsConfig {
281 config: Arc::new(server_config),
282 acceptor,
283 })
284 }
285 CertConfig::Manual { private_key, certs } => {
286 let server_config =
287 server_config.with_single_cert(certs.clone(), private_key)?;
288 let server_config = Arc::new(server_config);
289 let acceptor =
290 tokio_rustls::TlsAcceptor::from(server_config.clone());
291 let acceptor = http_server::TlsAcceptor::Manual(acceptor);
292 Some(http_server::TlsConfig {
293 config: server_config,
294 acceptor,
295 })
296 }
297 };
298 builder = builder.tls_config(server_tls_config);
299
300 let http_listener = TcpListener::bind(&relay_config.http_bind_addr)
303 .await
304 .context("failed to bind http")?;
305 let http_addr = http_listener.local_addr()?;
306 tasks.spawn(
307 run_captive_portal_service(http_listener)
308 .instrument(info_span!("http-service", addr = %http_addr)),
309 );
310 Some(http_addr)
311 }
312 None => {
313 builder = builder.request_handler(
316 Method::GET,
317 "/generate_204",
318 Box::new(serve_no_content_handler),
319 );
320 None
321 }
322 };
323 let relay_server = builder.spawn().await?;
324 (Some(relay_server), http_addr)
325 }
326 None => (None, None),
327 };
328 let relay_addr = relay_server.as_ref().map(|srv| srv.addr());
331 let relay_handle = relay_server.as_ref().map(|srv| srv.handle());
332 let task = tokio::spawn(relay_supervisor(tasks, relay_server));
333 Ok(Self {
334 http_addr: http_addr.or(relay_addr),
335 stun_addr,
336 https_addr: http_addr.and(relay_addr),
337 relay_handle,
338 supervisor: AbortOnDropHandle::new(task),
339 })
340 }
341
342 pub async fn shutdown(self) -> Result<()> {
346 if let Some(handle) = self.relay_handle {
349 handle.shutdown();
350 }
351 self.supervisor.await?
352 }
353
354 pub fn task_handle(&mut self) -> &mut AbortOnDropHandle<Result<()>> {
359 &mut self.supervisor
360 }
361
362 pub fn https_addr(&self) -> Option<SocketAddr> {
364 self.https_addr
365 }
366
367 pub fn http_addr(&self) -> Option<SocketAddr> {
369 self.http_addr
370 }
371
372 pub fn stun_addr(&self) -> Option<SocketAddr> {
374 self.stun_addr
375 }
376}
377
378#[instrument(skip_all)]
383async fn relay_supervisor(
384 mut tasks: JoinSet<Result<()>>,
385 mut relay_http_server: Option<http_server::Server>,
386) -> Result<()> {
387 let res = match (relay_http_server.as_mut(), tasks.len()) {
388 (None, _) => tasks
389 .join_next()
390 .await
391 .unwrap_or_else(|| Ok(Err(anyhow!("Nothing to supervise")))),
392 (Some(relay), 0) => relay.task_handle().await.map(anyhow::Ok),
393 (Some(relay), _) => {
394 tokio::select! {
395 biased;
396 Some(ret) = tasks.join_next() => ret,
397 ret = relay.task_handle() => ret.map(anyhow::Ok),
398 else => Ok(Err(anyhow!("Empty JoinSet (unreachable)"))),
399 }
400 }
401 };
402 let ret = match res {
403 Ok(Ok(())) => {
404 debug!("Task exited");
405 Ok(())
406 }
407 Ok(Err(err)) => {
408 error!(%err, "Task failed");
409 Err(err.context("task failed"))
410 }
411 Err(err) => {
412 if let Ok(panic) = err.try_into_panic() {
413 error!("Task panicked");
414 std::panic::resume_unwind(panic);
415 }
416 debug!("Task cancelled");
417 Err(anyhow!("task cancelled"))
418 }
419 };
420
421 if let Some(server) = relay_http_server {
424 server.shutdown();
425 }
426
427 tasks.shutdown().await;
428
429 ret
430}
431
432async fn server_stun_listener(sock: UdpSocket) -> Result<()> {
436 info!(addr = ?sock.local_addr().ok(), "running STUN server");
437 let sock = Arc::new(sock);
438 let mut buffer = vec![0u8; 64 << 10];
439 let mut tasks = JoinSet::new();
440 loop {
441 tokio::select! {
442 biased;
443 _ = tasks.join_next(), if !tasks.is_empty() => (),
444 res = sock.recv_from(&mut buffer) => {
445 match res {
446 Ok((n, src_addr)) => {
447 inc!(StunMetrics, requests);
448 let pkt = &buffer[..n];
449 if !stun::is(pkt) {
450 debug!(%src_addr, "STUN: ignoring non stun packet");
451 inc!(StunMetrics, bad_requests);
452 continue;
453 }
454 let pkt = pkt.to_vec();
455 tasks.spawn(handle_stun_request(src_addr, pkt, sock.clone()));
456 }
457 Err(err) => {
458 inc!(StunMetrics, failures);
459 warn!("failed to recv: {err:#}");
460 }
461 }
462 }
463 }
464 }
465}
466
467async fn handle_stun_request(src_addr: SocketAddr, pkt: Vec<u8>, sock: Arc<UdpSocket>) {
469 let handle =
470 AbortOnDropHandle::new(tokio::task::spawn_blocking(
471 move || match stun::parse_binding_request(&pkt) {
472 Ok(txid) => {
473 debug!(%src_addr, %txid, "STUN: received binding request");
474 Some((txid, stun::response(txid, src_addr)))
475 }
476 Err(err) => {
477 inc!(StunMetrics, bad_requests);
478 warn!(%src_addr, "STUN: invalid binding request: {:?}", err);
479 None
480 }
481 },
482 ));
483 let (txid, response) = match handle.await {
484 Ok(Some(val)) => val,
485 Ok(None) => return,
486 Err(err) => {
487 error!("{err:#}");
488 return;
489 }
490 };
491 match sock.send_to(&response, src_addr).await {
492 Ok(len) => {
493 if len != response.len() {
494 warn!(
495 %src_addr,
496 %txid,
497 "failed to write response, {len}/{} bytes sent",
498 response.len()
499 );
500 } else {
501 match src_addr {
502 SocketAddr::V4(_) => inc!(StunMetrics, ipv4_success),
503 SocketAddr::V6(_) => inc!(StunMetrics, ipv6_success),
504 }
505 }
506 trace!(%src_addr, %txid, "sent {len} bytes");
507 }
508 Err(err) => {
509 inc!(StunMetrics, failures);
510 warn!(%src_addr, %txid, "failed to write response: {err:#}");
511 }
512 }
513}
514
515fn root_handler(
516 _r: Request<Incoming>,
517 response: ResponseBuilder,
518) -> HyperResult<Response<BytesBody>> {
519 response
520 .status(StatusCode::OK)
521 .header("Content-Type", "text/html; charset=utf-8")
522 .body(INDEX.into())
523 .map_err(|err| Box::new(err) as HyperError)
524}
525
526fn probe_handler(
528 _r: Request<Incoming>,
529 response: ResponseBuilder,
530) -> HyperResult<Response<BytesBody>> {
531 response
532 .status(StatusCode::OK)
533 .header("Access-Control-Allow-Origin", "*")
534 .body(body_empty())
535 .map_err(|err| Box::new(err) as HyperError)
536}
537
538fn robots_handler(
539 _r: Request<Incoming>,
540 response: ResponseBuilder,
541) -> HyperResult<Response<BytesBody>> {
542 response
543 .status(StatusCode::OK)
544 .body(ROBOTS_TXT.into())
545 .map_err(|err| Box::new(err) as HyperError)
546}
547
548fn serve_no_content_handler<B: hyper::body::Body>(
550 r: Request<B>,
551 mut response: ResponseBuilder,
552) -> HyperResult<Response<BytesBody>> {
553 if let Some(challenge) = r.headers().get(NO_CONTENT_CHALLENGE_HEADER) {
554 if !challenge.is_empty()
555 && challenge.len() < 64
556 && challenge
557 .as_bytes()
558 .iter()
559 .all(|c| is_challenge_char(*c as char))
560 {
561 response = response.header(
562 NO_CONTENT_RESPONSE_HEADER,
563 format!("response {}", challenge.to_str()?),
564 );
565 }
566 }
567
568 response
569 .status(StatusCode::NO_CONTENT)
570 .body(body_empty())
571 .map_err(|err| Box::new(err) as HyperError)
572}
573
574fn is_challenge_char(c: char) -> bool {
575 c.is_ascii_lowercase()
577 || c.is_ascii_uppercase()
578 || c.is_ascii_digit()
579 || c == '.'
580 || c == '-'
581 || c == '_'
582}
583
584async fn run_captive_portal_service(http_listener: TcpListener) -> Result<()> {
586 info!("serving");
587
588 let mut tasks = JoinSet::new();
590
591 loop {
592 match http_listener.accept().await {
593 Ok((stream, peer_addr)) => {
594 debug!(%peer_addr, "Connection opened",);
595 let handler = CaptivePortalService;
596
597 tasks.spawn(async move {
598 let stream = crate::relay::server::streams::MaybeTlsStream::Plain(stream);
599 let stream = hyper_util::rt::TokioIo::new(stream);
600 if let Err(err) = hyper::server::conn::http1::Builder::new()
601 .serve_connection(stream, handler)
602 .with_upgrades()
603 .await
604 {
605 error!("Failed to serve connection: {err:?}");
606 }
607 });
608 }
609 Err(err) => {
610 error!(
611 "[CaptivePortalService] failed to accept connection: {:#?}",
612 err
613 );
614 }
615 }
616 }
617}
618
619#[derive(Clone)]
620struct CaptivePortalService;
621
622impl hyper::service::Service<Request<Incoming>> for CaptivePortalService {
623 type Response = Response<BytesBody>;
624 type Error = HyperError;
625 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
626
627 fn call(&self, req: Request<Incoming>) -> Self::Future {
628 match (req.method(), req.uri().path()) {
629 (&Method::GET, "/generate_204") => {
631 Box::pin(async move { serve_no_content_handler(req, Response::builder()) })
632 }
633 _ => {
634 let r = Response::builder()
636 .status(StatusCode::NOT_FOUND)
637 .body(NOTFOUND.into())
638 .map_err(|err| Box::new(err) as HyperError);
639 Box::pin(async move { r })
640 }
641 }
642 }
643}
644
645mod stun_metrics {
646 use iroh_metrics::{
647 core::{Counter, Metric},
648 struct_iterable::Iterable,
649 };
650
651 #[allow(missing_docs)]
653 #[derive(Debug, Clone, Iterable)]
654 pub struct StunMetrics {
655 pub requests: Counter,
660 pub ipv4_success: Counter,
662 pub ipv6_success: Counter,
664
665 pub bad_requests: Counter,
667 pub failures: Counter,
669 }
670
671 impl Default for StunMetrics {
672 fn default() -> Self {
673 Self {
674 requests: Counter::new("Number of STUN requests made to the server."),
678 ipv4_success: Counter::new("Number of successful ipv4 STUN requests served."),
679 ipv6_success: Counter::new("Number of successful ipv6 STUN requests served."),
680 bad_requests: Counter::new("Number of bad requests made to the STUN endpoint."),
681 failures: Counter::new("Number of STUN requests that end in failure."),
682 }
683 }
684 }
685
686 impl Metric for StunMetrics {
687 fn name() -> &'static str {
688 "stun"
689 }
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use std::{net::Ipv4Addr, time::Duration};
696
697 use bytes::Bytes;
698 use http::header::UPGRADE;
699 use iroh_base::{key::SecretKey, node_addr::RelayUrl};
700
701 use super::*;
702 use crate::relay::{
703 client::{conn::ReceivedMessage, ClientBuilder},
704 http::{Protocol, HTTP_UPGRADE_PROTOCOL},
705 };
706
707 async fn spawn_local_relay() -> Result<Server> {
708 Server::spawn(ServerConfig::<(), ()> {
709 relay: Some(RelayConfig {
710 http_bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
711 tls: None,
712 limits: Default::default(),
713 }),
714 stun: None,
715 metrics_addr: None,
716 })
717 .await
718 }
719
720 #[tokio::test]
721 async fn test_no_services() {
722 let _guard = iroh_test::logging::setup();
723 let mut server = Server::spawn(ServerConfig::<(), ()>::default())
724 .await
725 .unwrap();
726 let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
727 .await
728 .expect("timeout, server not finished")
729 .expect("server task JoinError");
730 assert!(res.is_err());
731 }
732
733 #[tokio::test]
734 async fn test_conflicting_bind() {
735 let _guard = iroh_test::logging::setup();
736 let mut server = Server::spawn(ServerConfig::<(), ()> {
737 relay: Some(RelayConfig {
738 http_bind_addr: (Ipv4Addr::LOCALHOST, 1234).into(),
739 tls: None,
740 limits: Default::default(),
741 }),
742 stun: None,
743 metrics_addr: Some((Ipv4Addr::LOCALHOST, 1234).into()),
744 })
745 .await
746 .unwrap();
747 let res = tokio::time::timeout(Duration::from_secs(5), server.task_handle())
748 .await
749 .expect("timeout, server not finished")
750 .expect("server task JoinError");
751 assert!(res.is_err()); }
753
754 #[tokio::test]
755 async fn test_root_handler() {
756 let _guard = iroh_test::logging::setup();
757 let server = spawn_local_relay().await.unwrap();
758 let url = format!("http://{}", server.http_addr().unwrap());
759
760 let response = reqwest::get(&url).await.unwrap();
761 assert_eq!(response.status(), 200);
762 let body = response.text().await.unwrap();
763 assert!(body.contains("iroh.computer"));
764 }
765
766 #[tokio::test]
767 async fn test_captive_portal_service() {
768 let _guard = iroh_test::logging::setup();
769 let server = spawn_local_relay().await.unwrap();
770 let url = format!("http://{}/generate_204", server.http_addr().unwrap());
771 let challenge = "123az__.";
772
773 let client = reqwest::Client::new();
774 let response = client
775 .get(&url)
776 .header(NO_CONTENT_CHALLENGE_HEADER, challenge)
777 .send()
778 .await
779 .unwrap();
780 assert_eq!(response.status(), StatusCode::NO_CONTENT);
781 let header = response.headers().get(NO_CONTENT_RESPONSE_HEADER).unwrap();
782 assert_eq!(header.to_str().unwrap(), format!("response {challenge}"));
783 let body = response.text().await.unwrap();
784 assert!(body.is_empty());
785 }
786
787 #[tokio::test]
788 async fn test_relay_client_legacy_route() {
789 let _guard = iroh_test::logging::setup();
790 let server = spawn_local_relay().await.unwrap();
791 let endpoint_url = format!("http://{}/derp", server.http_addr().unwrap());
793
794 let client = reqwest::Client::new();
795 let result = client
796 .get(endpoint_url)
797 .header(UPGRADE, HTTP_UPGRADE_PROTOCOL)
798 .send()
799 .await
800 .unwrap();
801
802 assert_eq!(result.status(), StatusCode::SWITCHING_PROTOCOLS);
803 }
804
805 #[tokio::test]
806 async fn test_relay_clients_both_derp() {
807 let _guard = iroh_test::logging::setup();
808 let server = spawn_local_relay().await.unwrap();
809 let relay_url = format!("http://{}", server.http_addr().unwrap());
810 let relay_url: RelayUrl = relay_url.parse().unwrap();
811
812 let a_secret_key = SecretKey::generate();
814 let a_key = a_secret_key.public();
815 let resolver = crate::dns::default_resolver().clone();
816 let (client_a, mut client_a_receiver) =
817 ClientBuilder::new(relay_url.clone()).build(a_secret_key, resolver);
818 let connect_client = client_a.clone();
819
820 if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move {
822 loop {
823 match connect_client.connect().await {
824 Ok(_) => break,
825 Err(err) => {
826 warn!("client unable to connect to relay server: {err:#}");
827 tokio::time::sleep(Duration::from_millis(100)).await;
828 }
829 }
830 }
831 })
832 .await
833 {
834 panic!("error connecting to relay server: {err:#}");
835 }
836
837 let b_secret_key = SecretKey::generate();
839 let b_key = b_secret_key.public();
840 let resolver = crate::dns::default_resolver().clone();
841 let (client_b, mut client_b_receiver) =
842 ClientBuilder::new(relay_url.clone()).build(b_secret_key, resolver);
843 client_b.connect().await.unwrap();
844
845 let msg = Bytes::from("hello, b");
847 client_a.send(b_key, msg.clone()).await.unwrap();
848
849 let res = client_b_receiver.recv().await.unwrap().unwrap();
850 if let ReceivedMessage::ReceivedPacket { source, data } = res {
851 assert_eq!(a_key, source);
852 assert_eq!(msg, data);
853 } else {
854 panic!("client_b received unexpected message {res:?}");
855 }
856
857 let msg = Bytes::from("howdy, a");
859 client_b.send(a_key, msg.clone()).await.unwrap();
860
861 let res = client_a_receiver.recv().await.unwrap().unwrap();
862 if let ReceivedMessage::ReceivedPacket { source, data } = res {
863 assert_eq!(b_key, source);
864 assert_eq!(msg, data);
865 } else {
866 panic!("client_a received unexpected message {res:?}");
867 }
868 }
869
870 #[tokio::test]
871 async fn test_relay_clients_both_websockets() {
872 let _guard = iroh_test::logging::setup();
873 let server = spawn_local_relay().await.unwrap();
874
875 let relay_url = format!("http://{}", server.http_addr().unwrap());
876 let relay_url: RelayUrl = relay_url.parse().unwrap();
877
878 let a_secret_key = SecretKey::generate();
880 let a_key = a_secret_key.public();
881 let resolver = crate::dns::default_resolver().clone();
882 let (client_a, mut client_a_receiver) = ClientBuilder::new(relay_url.clone())
883 .protocol(Protocol::Websocket)
884 .build(a_secret_key, resolver);
885 let connect_client = client_a.clone();
886
887 if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move {
889 loop {
890 match connect_client.connect().await {
891 Ok(_) => break,
892 Err(err) => {
893 warn!("client unable to connect to relay server: {err:#}");
894 tokio::time::sleep(Duration::from_millis(100)).await;
895 }
896 }
897 }
898 })
899 .await
900 {
901 panic!("error connecting to relay server: {err:#}");
902 }
903
904 let b_secret_key = SecretKey::generate();
906 let b_key = b_secret_key.public();
907 let resolver = crate::dns::default_resolver().clone();
908 let (client_b, mut client_b_receiver) = ClientBuilder::new(relay_url.clone())
909 .protocol(Protocol::Websocket) .build(b_secret_key, resolver);
911 client_b.connect().await.unwrap();
912
913 let msg = Bytes::from("hello, b");
915 client_a.send(b_key, msg.clone()).await.unwrap();
916
917 let res = client_b_receiver.recv().await.unwrap().unwrap();
918 if let ReceivedMessage::ReceivedPacket { source, data } = res {
919 assert_eq!(a_key, source);
920 assert_eq!(msg, data);
921 } else {
922 panic!("client_b received unexpected message {res:?}");
923 }
924
925 let msg = Bytes::from("howdy, a");
927 client_b.send(a_key, msg.clone()).await.unwrap();
928
929 let res = client_a_receiver.recv().await.unwrap().unwrap();
930 if let ReceivedMessage::ReceivedPacket { source, data } = res {
931 assert_eq!(b_key, source);
932 assert_eq!(msg, data);
933 } else {
934 panic!("client_a received unexpected message {res:?}");
935 }
936 }
937
938 #[tokio::test]
939 async fn test_relay_clients_websocket_and_derp() {
940 let _guard = iroh_test::logging::setup();
941 let server = spawn_local_relay().await.unwrap();
942
943 let relay_url = format!("http://{}", server.http_addr().unwrap());
944 let relay_url: RelayUrl = relay_url.parse().unwrap();
945
946 let a_secret_key = SecretKey::generate();
948 let a_key = a_secret_key.public();
949 let resolver = crate::dns::default_resolver().clone();
950 let (client_a, mut client_a_receiver) =
951 ClientBuilder::new(relay_url.clone()).build(a_secret_key, resolver);
952 let connect_client = client_a.clone();
953
954 if let Err(err) = tokio::time::timeout(Duration::from_secs(10), async move {
956 loop {
957 match connect_client.connect().await {
958 Ok(_) => break,
959 Err(err) => {
960 warn!("client unable to connect to relay server: {err:#}");
961 tokio::time::sleep(Duration::from_millis(100)).await;
962 }
963 }
964 }
965 })
966 .await
967 {
968 panic!("error connecting to relay server: {err:#}");
969 }
970
971 let b_secret_key = SecretKey::generate();
973 let b_key = b_secret_key.public();
974 let resolver = crate::dns::default_resolver().clone();
975 let (client_b, mut client_b_receiver) = ClientBuilder::new(relay_url.clone())
976 .protocol(Protocol::Websocket) .build(b_secret_key, resolver);
978 client_b.connect().await.unwrap();
979
980 let msg = Bytes::from("hello, b");
982 client_a.send(b_key, msg.clone()).await.unwrap();
983
984 let res = client_b_receiver.recv().await.unwrap().unwrap();
985 if let ReceivedMessage::ReceivedPacket { source, data } = res {
986 assert_eq!(a_key, source);
987 assert_eq!(msg, data);
988 } else {
989 panic!("client_b received unexpected message {res:?}");
990 }
991
992 let msg = Bytes::from("howdy, a");
994 client_b.send(a_key, msg.clone()).await.unwrap();
995
996 let res = client_a_receiver.recv().await.unwrap().unwrap();
997 if let ReceivedMessage::ReceivedPacket { source, data } = res {
998 assert_eq!(b_key, source);
999 assert_eq!(msg, data);
1000 } else {
1001 panic!("client_a received unexpected message {res:?}");
1002 }
1003 }
1004
1005 #[tokio::test]
1006 async fn test_stun() {
1007 let _guard = iroh_test::logging::setup();
1008 let server = Server::spawn(ServerConfig::<(), ()> {
1009 relay: None,
1010 stun: Some(StunConfig {
1011 bind_addr: (Ipv4Addr::LOCALHOST, 0).into(),
1012 }),
1013 metrics_addr: None,
1014 })
1015 .await
1016 .unwrap();
1017
1018 let txid = stun::TransactionId::default();
1019 let req = stun::request(txid);
1020 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1021 socket
1022 .send_to(&req, server.stun_addr().unwrap())
1023 .await
1024 .unwrap();
1025
1026 let mut buf = vec![0u8; 64000];
1028 let (len, addr) = socket.recv_from(&mut buf).await.unwrap();
1029 assert_eq!(addr, server.stun_addr().unwrap());
1030 buf.truncate(len);
1031 let (txid_back, response_addr) = stun::parse_response(&buf).unwrap();
1032 assert_eq!(txid, txid_back);
1033 assert_eq!(response_addr, socket.local_addr().unwrap());
1034 }
1035}