1use std::{
4 collections::HashMap,
5 net::{IpAddr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use base64::{engine::general_purpose::URL_SAFE, Engine as _};
11use bytes::Bytes;
12use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage};
13use futures_lite::future::Boxed as BoxFuture;
14use futures_util::StreamExt;
15use http_body_util::Empty;
16use hyper::{
17 body::Incoming,
18 header::{HOST, UPGRADE},
19 upgrade::Parts,
20 Request,
21};
22use hyper_util::rt::TokioIo;
23use rand::Rng;
24use rustls::client::Resumption;
25use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream};
26use tokio::{
27 io::{AsyncRead, AsyncWrite},
28 net::TcpStream,
29 sync::{mpsc, oneshot},
30 task::JoinSet,
31 time::Instant,
32};
33use tokio_util::{
34 codec::{FramedRead, FramedWrite},
35 task::AbortOnDropHandle,
36};
37use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
38use url::Url;
39
40use crate::{
41 defaults::timeouts::relay::*,
42 dns::{DnsResolver, ResolverExt},
43 key::{NodeId, PublicKey, SecretKey},
44 relay::{
45 codec::DerpCodec,
46 http::{Protocol, RELAY_PATH},
47 RelayUrl,
48 },
49 util::chain,
50};
51
52pub(crate) mod conn;
53pub(crate) mod streams;
54
55#[derive(Debug, thiserror::Error)]
57pub enum ClientError {
58 #[error("client is closed")]
60 Closed,
61 #[error("no relay client")]
63 NoClient,
64 #[error("error sending a packet")]
66 Send,
67 #[error("error receiving a packet: {0:?}")]
69 Receive(anyhow::Error),
70 #[error("connect timeout")]
72 ConnectTimeout,
73 #[error("Relay node is not available")]
75 RelayNodeNotAvail,
76 #[error("no nodes available for {0}")]
78 NoNodeForTarget(String),
79 #[error("no relay nodes found for {0}, only are stun_only nodes")]
81 StunOnlyNodesFound(String),
82 #[error("dial error")]
84 DialIO(#[from] std::io::Error),
85 #[error("dial error")]
87 DialTask(#[from] tokio::task::JoinError),
88 #[error("both IPv4 and IPv6 are explicitly disabled for this node")]
90 IPDisabled,
91 #[error("no local addr: {0}")]
93 NoLocalAddr(String),
94 #[error("http connection error")]
96 Hyper(#[from] hyper::Error),
97 #[error("http error")]
99 Http(#[from] http::Error),
100 #[error("unexpected status code: expected {0}, got {1}")]
102 UnexpectedStatusCode(hyper::StatusCode, hyper::StatusCode),
103 #[error("failed to upgrade connection: {0}")]
105 Upgrade(String),
106 #[error("failed to proxy connection: {0}")]
108 Proxy(String),
109 #[error("failed to build relay client: {0}")]
111 Build(String),
112 #[error("ping timeout")]
114 PingTimeout,
115 #[error("ping aborted")]
117 PingAborted,
118 #[error("cannot acknowledge pings")]
120 CannotAckPings,
121 #[error("invalid url: {0}")]
123 InvalidUrl(String),
124 #[error("dns: {0:?}")]
126 Dns(Option<anyhow::Error>),
127 #[error("dns timeout")]
129 DnsTimeout,
130 #[error("actor gone")]
132 ActorGone,
133 #[error("websocket error: {0}")]
135 WebsocketError(#[from] tokio_tungstenite_wasm::Error),
136}
137
138#[derive(Clone, Debug)]
142pub struct Client {
143 inner: mpsc::Sender<ActorMessage>,
144 public_key: PublicKey,
145 #[allow(dead_code)]
146 recv_loop: Arc<AbortOnDropHandle<()>>,
147}
148
149#[derive(Debug)]
150enum ActorMessage {
151 Connect(oneshot::Sender<Result<Conn, ClientError>>),
152 NotePreferred(bool),
153 LocalAddr(oneshot::Sender<Result<Option<SocketAddr>, ClientError>>),
154 Ping(oneshot::Sender<Result<Duration, ClientError>>),
155 Pong([u8; 8], oneshot::Sender<Result<(), ClientError>>),
156 Send(PublicKey, Bytes, oneshot::Sender<Result<(), ClientError>>),
157 Close(oneshot::Sender<Result<(), ClientError>>),
158 CloseForReconnect(oneshot::Sender<Result<(), ClientError>>),
159 IsConnected(oneshot::Sender<Result<bool, ClientError>>),
160}
161
162#[derive(Debug)]
164pub struct ClientReceiver {
165 msg_receiver: mpsc::Receiver<Result<ReceivedMessage, ClientError>>,
166}
167
168#[derive(derive_more::Debug)]
169struct Actor {
170 secret_key: SecretKey,
171 can_ack_pings: bool,
172 is_preferred: bool,
173 relay_conn: Option<(Conn, ConnReceiver)>,
174 is_closed: bool,
175 #[debug("address family selector callback")]
176 address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
177 url: RelayUrl,
178 protocol: Protocol,
179 #[debug("TlsConnector")]
180 tls_connector: tokio_rustls::TlsConnector,
181 pings: PingTracker,
182 ping_tasks: JoinSet<()>,
183 dns_resolver: DnsResolver,
184 proxy_url: Option<Url>,
185}
186
187#[derive(Default, Debug)]
188struct PingTracker(HashMap<[u8; 8], oneshot::Sender<()>>);
189
190impl PingTracker {
191 fn register(&mut self) -> ([u8; 8], oneshot::Receiver<()>) {
194 let data = rand::thread_rng().gen::<[u8; 8]>();
195 let (send, recv) = oneshot::channel();
196 self.0.insert(data, send);
197 (data, recv)
198 }
199
200 fn unregister(&mut self, data: [u8; 8], why: &'static str) -> Option<oneshot::Sender<()>> {
204 trace!("removing ping {}: {}", hex::encode(data), why);
205 self.0.remove(&data)
206 }
207}
208
209#[derive(derive_more::Debug)]
211pub struct ClientBuilder {
212 can_ack_pings: bool,
214 is_preferred: bool,
216 #[debug("address family selector callback")]
218 address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
219 is_prober: bool,
221 server_public_key: Option<PublicKey>,
223 url: RelayUrl,
225 protocol: Protocol,
227 #[cfg(any(test, feature = "test-utils"))]
229 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
230 insecure_skip_cert_verify: bool,
231 proxy_url: Option<Url>,
233}
234
235impl ClientBuilder {
236 pub fn new(url: impl Into<RelayUrl>) -> Self {
238 ClientBuilder {
239 can_ack_pings: false,
240 is_preferred: false,
241 address_family_selector: None,
242 is_prober: false,
243 server_public_key: None,
244 url: url.into(),
245 protocol: Protocol::Relay,
246 #[cfg(any(test, feature = "test-utils"))]
247 insecure_skip_cert_verify: false,
248 proxy_url: None,
249 }
250 }
251
252 pub fn server_url(mut self, url: impl Into<RelayUrl>) -> Self {
254 self.url = url.into();
255 self
256 }
257
258 pub fn protocol(mut self, protocol: Protocol) -> Self {
261 self.protocol = protocol;
262 self
263 }
264
265 pub fn address_family_selector<S>(mut self, selector: S) -> Self
272 where
273 S: Fn() -> BoxFuture<bool> + Send + Sync + 'static,
274 {
275 self.address_family_selector = Some(Box::new(selector));
276 self
277 }
278
279 pub fn can_ack_pings(mut self, can: bool) -> Self {
281 self.can_ack_pings = can;
282 self
283 }
284
285 pub fn is_preferred(mut self, is: bool) -> Self {
288 self.is_preferred = is;
289 self
290 }
291
292 pub fn is_prober(mut self, is: bool) -> Self {
294 self.is_prober = is;
295 self
296 }
297
298 #[cfg(any(test, feature = "test-utils"))]
302 #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
303 pub fn insecure_skip_cert_verify(mut self, skip: bool) -> Self {
304 self.insecure_skip_cert_verify = skip;
305 self
306 }
307
308 pub fn proxy_url(mut self, url: Url) -> Self {
310 self.proxy_url.replace(url);
311 self
312 }
313
314 pub fn build(self, key: SecretKey, dns_resolver: DnsResolver) -> (Client, ClientReceiver) {
316 let roots = rustls::RootCertStore {
318 roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
319 };
320 let mut config = rustls::client::ClientConfig::builder_with_provider(Arc::new(
321 rustls::crypto::ring::default_provider(),
322 ))
323 .with_safe_default_protocol_versions()
324 .expect("protocols supported by ring")
325 .with_root_certificates(roots)
326 .with_no_client_auth();
327 #[cfg(any(test, feature = "test-utils"))]
328 if self.insecure_skip_cert_verify {
329 warn!("Insecure config: SSL certificates from relay servers will be trusted without verification");
330 config
331 .dangerous()
332 .set_certificate_verifier(Arc::new(NoCertVerifier));
333 }
334
335 config.resumption = Resumption::default();
336
337 let tls_connector: tokio_rustls::TlsConnector = Arc::new(config).into();
338 let public_key = key.public();
339
340 let inner = Actor {
341 secret_key: key,
342 can_ack_pings: self.can_ack_pings,
343 is_preferred: self.is_preferred,
344 relay_conn: None,
345 is_closed: false,
346 address_family_selector: self.address_family_selector,
347 pings: PingTracker::default(),
348 ping_tasks: Default::default(),
349 url: self.url,
350 protocol: self.protocol,
351 tls_connector,
352 dns_resolver,
353 proxy_url: self.proxy_url,
354 };
355
356 let (msg_sender, inbox) = mpsc::channel(64);
357 let (s, r) = mpsc::channel(64);
358 let recv_loop = tokio::task::spawn(
359 async move { inner.run(inbox, s).await }.instrument(info_span!("client")),
360 );
361
362 (
363 Client {
364 public_key,
365 inner: msg_sender,
366 recv_loop: Arc::new(AbortOnDropHandle::new(recv_loop)),
367 },
368 ClientReceiver { msg_receiver: r },
369 )
370 }
371
372 pub fn server_public_key(mut self, server_public_key: PublicKey) -> Self {
374 self.server_public_key = Some(server_public_key);
375 self
376 }
377}
378
379impl ClientReceiver {
380 pub async fn recv(&mut self) -> Option<Result<ReceivedMessage, ClientError>> {
382 self.msg_receiver.recv().await
383 }
384}
385
386impl Client {
387 pub fn public_key(&self) -> PublicKey {
389 self.public_key
390 }
391
392 async fn send_actor<F, T>(&self, msg_create: F) -> Result<T, ClientError>
393 where
394 F: FnOnce(oneshot::Sender<Result<T, ClientError>>) -> ActorMessage,
395 {
396 let (s, r) = oneshot::channel();
397 let msg = msg_create(s);
398 match self.inner.send(msg).await {
399 Ok(_) => {
400 let res = r.await.map_err(|_| ClientError::ActorGone)??;
401 Ok(res)
402 }
403 Err(_) => Err(ClientError::ActorGone),
404 }
405 }
406
407 pub async fn connect(&self) -> Result<Conn, ClientError> {
414 self.send_actor(ActorMessage::Connect).await
415 }
416
417 pub async fn note_preferred(&self, is_preferred: bool) {
419 self.inner
420 .send(ActorMessage::NotePreferred(is_preferred))
421 .await
422 .ok();
423 }
424
425 pub async fn local_addr(&self) -> Option<SocketAddr> {
428 self.send_actor(ActorMessage::LocalAddr)
429 .await
430 .ok()
431 .flatten()
432 }
433
434 pub async fn ping(&self) -> Result<Duration, ClientError> {
438 self.send_actor(ActorMessage::Ping).await
439 }
440
441 pub async fn send_pong(&self, data: [u8; 8]) -> Result<(), ClientError> {
449 self.send_actor(|s| ActorMessage::Pong(data, s)).await
450 }
451
452 pub async fn send(&self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
460 self.send_actor(|s| ActorMessage::Send(dst_key, b, s)).await
461 }
462
463 pub async fn close(self) -> Result<(), ClientError> {
465 self.send_actor(ActorMessage::Close).await
466 }
467
468 pub async fn close_for_reconnect(&self) -> Result<(), ClientError> {
470 self.send_actor(ActorMessage::CloseForReconnect).await
471 }
472
473 pub async fn is_connected(&self) -> Result<bool, ClientError> {
475 self.send_actor(ActorMessage::IsConnected).await
476 }
477}
478
479impl Actor {
480 async fn run(
481 mut self,
482 mut inbox: mpsc::Receiver<ActorMessage>,
483 msg_sender: mpsc::Sender<Result<ReceivedMessage, ClientError>>,
484 ) {
485 if let Err(err) = self.connect("initial connect").await {
487 msg_sender.send(Err(err)).await.ok();
488 }
489
490 loop {
491 tokio::select! {
492 res = self.recv_detail() => {
493 if let Ok(ReceivedMessage::Pong(ping)) = res {
494 match self.pings.unregister(ping, "pong") {
495 Some(chan) => {
496 if chan.send(()).is_err() {
497 warn!("pong received for ping {ping:?}, but the receiving channel was closed");
498 }
499 }
500 None => {
501 warn!("pong received for ping {ping:?}, but not registered");
502 }
503 }
504 continue;
505 }
506 msg_sender.send(res).await.ok();
507 }
508 Some(msg) = inbox.recv() => {
509 match msg {
510 ActorMessage::Connect(s) => {
511 let res = self.connect("actor msg").await.map(|(client, _)| (client));
512 s.send(res).ok();
513 },
514 ActorMessage::NotePreferred(is_preferred) => {
515 self.note_preferred(is_preferred).await;
516 },
517 ActorMessage::LocalAddr(s) => {
518 let res = self.local_addr();
519 s.send(Ok(res)).ok();
520 },
521 ActorMessage::Ping(s) => {
522 self.ping(s).await;
523 },
524 ActorMessage::Pong(data, s) => {
525 let res = self.send_pong(data).await;
526 s.send(res).ok();
527 },
528 ActorMessage::Send(key, data, s) => {
529 let res = self.send(key, data).await;
530 s.send(res).ok();
531 },
532 ActorMessage::Close(s) => {
533 let res = self.close().await;
534 s.send(Ok(res)).ok();
535 break;
537 },
538 ActorMessage::CloseForReconnect(s) => {
539 let res = self.close_for_reconnect().await;
540 s.send(Ok(res)).ok();
541 },
542 ActorMessage::IsConnected(s) => {
543 let res = self.is_connected();
544 s.send(Ok(res)).ok();
545 },
546 }
547 }
548 else => {
549 self.close().await;
551 break;
552 }
553 }
554 }
555 }
556
557 async fn connect(
566 &mut self,
567 why: &'static str,
568 ) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> {
569 if self.is_closed {
570 return Err(ClientError::Closed);
571 }
572 let url = self.url.clone();
573 async move {
574 if self.relay_conn.is_none() {
575 trace!("no connection, trying to connect");
576 let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
577 .await
578 .map_err(|_| ClientError::ConnectTimeout)??;
579
580 self.relay_conn = Some((conn, receiver));
581 } else {
582 trace!("already had connection");
583 }
584 let (conn, receiver) = self
585 .relay_conn
586 .as_mut()
587 .map(|(c, r)| (c.clone(), r))
588 .expect("just checked");
589
590 Ok((conn, receiver))
591 }
592 .instrument(info_span!("connect", %url, %why))
593 .await
594 }
595
596 async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> {
597 let (reader, writer, local_addr) = match self.protocol {
598 Protocol::Websocket => {
599 let (reader, writer) = self.connect_ws().await?;
600 let local_addr = None;
601 (reader, writer, local_addr)
602 }
603 Protocol::Relay => {
604 let (reader, writer, local_addr) = self.connect_derp().await?;
605 (reader, writer, Some(local_addr))
606 }
607 };
608
609 let (conn, receiver) =
610 ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
611 .build()
612 .await
613 .map_err(|e| ClientError::Build(e.to_string()))?;
614
615 if self.is_preferred && conn.note_preferred(true).await.is_err() {
616 conn.close().await;
617 return Err(ClientError::Send);
618 }
619
620 event!(
621 target: "events.net.relay.connected",
622 Level::DEBUG,
623 home = self.is_preferred,
624 url = %self.url,
625 );
626
627 trace!("connect_0 done");
628 Ok((conn, receiver))
629 }
630
631 async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> {
632 let mut dial_url = (*self.url).clone();
633 dial_url.set_path(RELAY_PATH);
634 dial_url
637 .set_scheme(if self.use_tls() { "wss" } else { "ws" })
638 .map_err(|()| ClientError::InvalidUrl(self.url.to_string()))?;
639
640 debug!(%dial_url, "Dialing relay by websocket");
641
642 let (writer, reader) = tokio_tungstenite_wasm::connect(dial_url).await?.split();
643
644 let reader = ConnReader::Ws(reader);
645 let writer = ConnWriter::Ws(writer);
646
647 Ok((reader, writer))
648 }
649
650 async fn connect_derp(&self) -> Result<(ConnReader, ConnWriter, SocketAddr), ClientError> {
651 let url = self.url.clone();
652 let tcp_stream = self.dial_url().await?;
653
654 let local_addr = tcp_stream
655 .local_addr()
656 .map_err(|e| ClientError::NoLocalAddr(e.to_string()))?;
657
658 debug!(server_addr = ?tcp_stream.peer_addr(), %local_addr, "TCP stream connected");
659
660 let response = if self.use_tls() {
661 debug!("Starting TLS handshake");
662 let hostname = self
663 .tls_servername()
664 .ok_or_else(|| ClientError::InvalidUrl("No tls servername".into()))?;
665 let hostname = hostname.to_owned();
666 let tls_stream = self.tls_connector.connect(hostname, tcp_stream).await?;
667 debug!("tls_connector connect success");
668 Self::start_upgrade(tls_stream, url).await?
669 } else {
670 debug!("Starting handshake");
671 Self::start_upgrade(tcp_stream, url).await?
672 };
673
674 if response.status() != hyper::StatusCode::SWITCHING_PROTOCOLS {
675 error!(
676 "expected status 101 SWITCHING_PROTOCOLS, got: {}",
677 response.status()
678 );
679 return Err(ClientError::UnexpectedStatusCode(
680 hyper::StatusCode::SWITCHING_PROTOCOLS,
681 response.status(),
682 ));
683 }
684
685 debug!("starting upgrade");
686 let upgraded = match hyper::upgrade::on(response).await {
687 Ok(upgraded) => upgraded,
688 Err(err) => {
689 warn!("upgrade failed: {:#}", err);
690 return Err(ClientError::Hyper(err));
691 }
692 };
693
694 debug!("connection upgraded");
695 let (reader, writer) =
696 downcast_upgrade(upgraded).map_err(|e| ClientError::Upgrade(e.to_string()))?;
697
698 let reader = ConnReader::Derp(FramedRead::new(reader, DerpCodec));
699 let writer = ConnWriter::Derp(FramedWrite::new(writer, DerpCodec));
700
701 Ok((reader, writer, local_addr))
702 }
703
704 async fn start_upgrade<T>(
706 io: T,
707 relay_url: RelayUrl,
708 ) -> Result<hyper::Response<Incoming>, ClientError>
709 where
710 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
711 {
712 let host_header_value = host_header_value(relay_url)?;
713
714 let io = hyper_util::rt::TokioIo::new(io);
715 let (mut request_sender, connection) = hyper::client::conn::http1::Builder::new()
716 .handshake(io)
717 .await?;
718 tokio::spawn(
719 async move {
721 debug!("HTTP upgrade driver started");
722 if let Err(err) = connection.with_upgrades().await {
723 error!("HTTP upgrade error: {err:#}");
724 }
725 debug!("HTTP upgrade driver finished");
726 }
727 .instrument(info_span!("http-driver")),
728 );
729 debug!("Sending upgrade request");
730 let req = Request::builder()
731 .uri(RELAY_PATH)
732 .header(UPGRADE, Protocol::Relay.upgrade_header())
733 .header(HOST, host_header_value)
737 .body(http_body_util::Empty::<hyper::body::Bytes>::new())?;
738 request_sender.send_request(req).await.map_err(From::from)
739 }
740
741 async fn note_preferred(&mut self, is_preferred: bool) {
742 let old = &mut self.is_preferred;
743 if *old == is_preferred {
744 return;
745 }
746 *old = is_preferred;
747
748 let res = {
750 if let Some((ref conn, _)) = self.relay_conn {
751 conn.note_preferred(is_preferred).await
752 } else {
753 return;
754 }
755 };
756 if res.is_err() {
759 self.close_for_reconnect().await;
760 }
761 }
762
763 fn local_addr(&self) -> Option<SocketAddr> {
764 if self.is_closed {
765 return None;
766 }
767 if let Some((ref conn, _)) = self.relay_conn {
768 conn.local_addr()
769 } else {
770 None
771 }
772 }
773
774 async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
775 let connect_res = self.connect("ping").await.map(|(c, _)| c);
776 let (ping, recv) = self.pings.register();
777 trace!("ping: {}", hex::encode(ping));
778
779 self.ping_tasks.spawn(async move {
780 let res = match connect_res {
781 Ok(conn) => {
782 let start = Instant::now();
783 if let Err(err) = conn.send_ping(ping).await {
784 warn!("failed to send ping: {:?}", err);
785 Err(ClientError::Send)
786 } else {
787 match tokio::time::timeout(PING_TIMEOUT, recv).await {
788 Ok(Ok(())) => Ok(start.elapsed()),
789 Err(_) => Err(ClientError::PingTimeout),
790 Ok(Err(_)) => Err(ClientError::PingAborted),
791 }
792 }
793 }
794 Err(err) => Err(err),
795 };
796 s.send(res).ok();
797 });
798 }
799
800 async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> {
801 trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send");
802 let (conn, _) = self.connect("send").await?;
803 if conn.send(remote_node, payload).await.is_err() {
804 self.close_for_reconnect().await;
805 return Err(ClientError::Send);
806 }
807 Ok(())
808 }
809
810 async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
811 debug!("send_pong");
812 if self.can_ack_pings {
813 let (conn, _) = self.connect("send_pong").await?;
814 if conn.send_pong(data).await.is_err() {
815 self.close_for_reconnect().await;
816 return Err(ClientError::Send);
817 }
818 Ok(())
819 } else {
820 Err(ClientError::CannotAckPings)
821 }
822 }
823
824 async fn close(mut self) {
825 if !self.is_closed {
826 self.is_closed = true;
827 self.close_for_reconnect().await;
828 }
829 }
830
831 fn is_connected(&self) -> bool {
832 if self.is_closed {
833 return false;
834 }
835 self.relay_conn.is_some()
836 }
837
838 fn tls_servername(&self) -> Option<rustls::pki_types::ServerName> {
839 self.url
840 .host_str()
841 .and_then(|s| rustls::pki_types::ServerName::try_from(s).ok())
842 }
843
844 fn use_tls(&self) -> bool {
845 #[allow(clippy::match_like_matches_macro)]
847 match self.url.scheme() {
848 "http" => false,
849 "ws" => false,
850 _ => true,
851 }
852 }
853
854 async fn dial_url(&self) -> Result<ProxyStream, ClientError> {
855 if let Some(ref proxy) = self.proxy_url {
856 let stream = self.dial_url_proxy(proxy.clone()).await?;
857 Ok(ProxyStream::Proxied(stream))
858 } else {
859 let stream = self.dial_url_direct().await?;
860 Ok(ProxyStream::Raw(stream))
861 }
862 }
863
864 async fn dial_url_direct(&self) -> Result<TcpStream, ClientError> {
865 debug!(%self.url, "dial url");
866 let prefer_ipv6 = self.prefer_ipv6().await;
867 let dst_ip = resolve_host(&self.dns_resolver, &self.url, prefer_ipv6).await?;
868
869 let port = url_port(&self.url)
870 .ok_or_else(|| ClientError::InvalidUrl("missing url port".into()))?;
871 let addr = SocketAddr::new(dst_ip, port);
872
873 debug!("connecting to {}", addr);
874 let tcp_stream =
875 tokio::time::timeout(
876 DIAL_NODE_TIMEOUT,
877 async move { TcpStream::connect(addr).await },
878 )
879 .await
880 .map_err(|_| ClientError::ConnectTimeout)?
881 .map_err(ClientError::DialIO)?;
882
883 tcp_stream.set_nodelay(true)?;
884
885 Ok(tcp_stream)
886 }
887
888 async fn dial_url_proxy(
889 &self,
890 proxy_url: Url,
891 ) -> Result<chain::Chain<std::io::Cursor<Bytes>, MaybeTlsStream>, ClientError> {
892 debug!(%self.url, %proxy_url, "dial url via proxy");
893
894 let prefer_ipv6 = self.prefer_ipv6().await;
896 let proxy_ip = resolve_host(&self.dns_resolver, &proxy_url, prefer_ipv6).await?;
897
898 let proxy_port = url_port(&proxy_url)
899 .ok_or_else(|| ClientError::Proxy("missing proxy url port".into()))?;
900 let proxy_addr = SocketAddr::new(proxy_ip, proxy_port);
901
902 debug!(%proxy_addr, "connecting to proxy");
903
904 let tcp_stream = tokio::time::timeout(DIAL_NODE_TIMEOUT, async move {
905 TcpStream::connect(proxy_addr).await
906 })
907 .await
908 .map_err(|_| ClientError::ConnectTimeout)?
909 .map_err(ClientError::DialIO)?;
910
911 tcp_stream.set_nodelay(true)?;
912
913 let io = if proxy_url.scheme() == "http" {
915 MaybeTlsStream::Raw(tcp_stream)
916 } else {
917 let hostname = proxy_url
918 .host_str()
919 .and_then(|s| rustls::pki_types::ServerName::try_from(s.to_string()).ok())
920 .ok_or_else(|| ClientError::InvalidUrl("No tls servername for proxy url".into()))?;
921 let tls_stream = self.tls_connector.connect(hostname, tcp_stream).await?;
922 MaybeTlsStream::Tls(tls_stream)
923 };
924 let io = TokioIo::new(io);
925
926 let target_host = self
927 .url
928 .host_str()
929 .ok_or_else(|| ClientError::Proxy("missing proxy host".into()))?;
930
931 let port =
932 url_port(&self.url).ok_or_else(|| ClientError::Proxy("invalid target port".into()))?;
933
934 let mut req_builder = Request::builder()
936 .uri(format!("{}:{}", target_host, port))
937 .method("CONNECT")
938 .header("Host", target_host)
939 .header("Proxy-Connection", "Keep-Alive");
940 if !proxy_url.username().is_empty() {
941 debug!(
944 "setting proxy-authorization: username={}",
945 proxy_url.username()
946 );
947 let to_encode = format!(
948 "{}:{}",
949 proxy_url.username(),
950 proxy_url.password().unwrap_or_default()
951 );
952 let encoded = URL_SAFE.encode(to_encode);
953 req_builder = req_builder.header("Proxy-Authorization", format!("Basic {}", encoded));
954 }
955 let req = req_builder.body(Empty::<Bytes>::new())?;
956
957 debug!("Sending proxy request: {:?}", req);
958
959 let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
960 tokio::task::spawn(async move {
961 if let Err(err) = conn.with_upgrades().await {
962 error!("Proxy connection failed: {:?}", err);
963 }
964 });
965
966 let res = sender.send_request(req).await?;
967 if !res.status().is_success() {
968 return Err(ClientError::Proxy(format!(
969 "failed to connect to proxy: {}",
970 res.status(),
971 )));
972 }
973
974 let upgraded = hyper::upgrade::on(res).await?;
975 let Ok(Parts { io, read_buf, .. }) = upgraded.downcast::<TokioIo<MaybeTlsStream>>() else {
976 return Err(ClientError::Proxy("invalid upgrade".to_string()));
977 };
978
979 let res = chain::chain(std::io::Cursor::new(read_buf), io.into_inner());
980
981 Ok(res)
982 }
983
984 async fn prefer_ipv6(&self) -> bool {
990 match self.address_family_selector {
991 Some(ref selector) => selector().await,
992 None => false,
993 }
994 }
995
996 async fn recv_detail(&mut self) -> Result<ReceivedMessage, ClientError> {
997 if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() {
998 trace!("recv_detail tick");
999 match conn_receiver.recv().await {
1000 Ok(msg) => {
1001 return Ok(msg);
1002 }
1003 Err(e) => {
1004 self.close_for_reconnect().await;
1005 if self.is_closed {
1006 return Err(ClientError::Closed);
1007 }
1008 return Err(ClientError::Receive(e));
1010 }
1011 }
1012 }
1013 std::future::pending().await
1014 }
1015
1016 async fn close_for_reconnect(&mut self) {
1019 debug!("close for reconnect");
1020 if let Some((conn, _)) = self.relay_conn.take() {
1021 conn.close().await
1022 }
1023 }
1024}
1025
1026fn host_header_value(relay_url: RelayUrl) -> Result<String, ClientError> {
1027 let relay_url_host = relay_url
1029 .host_str()
1030 .ok_or_else(|| ClientError::InvalidUrl(relay_url.to_string()))?;
1031 let relay_url_host = relay_url_host.strip_suffix(".").unwrap_or(relay_url_host);
1033 let mut host_header_value = String::with_capacity(relay_url_host.len() + 6);
1035 host_header_value += relay_url_host;
1036 if let Some(port) = relay_url.port() {
1037 host_header_value += ":";
1038 host_header_value += &port.to_string();
1039 }
1040 Ok(host_header_value)
1041}
1042
1043async fn resolve_host(
1044 resolver: &DnsResolver,
1045 url: &Url,
1046 prefer_ipv6: bool,
1047) -> Result<IpAddr, ClientError> {
1048 let host = url
1049 .host()
1050 .ok_or_else(|| ClientError::InvalidUrl("missing host".into()))?;
1051 match host {
1052 url::Host::Domain(domain) => {
1053 let mut addrs = resolver
1055 .lookup_ipv4_ipv6(domain, DNS_TIMEOUT)
1056 .await
1057 .map_err(|e| ClientError::Dns(Some(e)))?
1058 .peekable();
1059
1060 let found = if prefer_ipv6 {
1061 let first = addrs.peek().copied();
1062 addrs.find(IpAddr::is_ipv6).or(first)
1063 } else {
1064 addrs.next()
1065 };
1066
1067 found.ok_or_else(|| ClientError::Dns(None))
1068 }
1069 url::Host::Ipv4(ip) => Ok(IpAddr::V4(ip)),
1070 url::Host::Ipv6(ip) => Ok(IpAddr::V6(ip)),
1071 }
1072}
1073
1074#[cfg(any(test, feature = "test-utils"))]
1076#[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
1077#[derive(Debug)]
1078struct NoCertVerifier;
1079
1080#[cfg(any(test, feature = "test-utils"))]
1081impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
1082 fn verify_server_cert(
1083 &self,
1084 _end_entity: &rustls::pki_types::CertificateDer,
1085 _intermediates: &[rustls::pki_types::CertificateDer],
1086 _server_name: &rustls::pki_types::ServerName,
1087 _ocsp_response: &[u8],
1088 _now: rustls::pki_types::UnixTime,
1089 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1090 Ok(rustls::client::danger::ServerCertVerified::assertion())
1091 }
1092 fn verify_tls12_signature(
1093 &self,
1094 _message: &[u8],
1095 _cert: &rustls::pki_types::CertificateDer<'_>,
1096 _dss: &rustls::DigitallySignedStruct,
1097 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1098 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1099 }
1100
1101 fn verify_tls13_signature(
1102 &self,
1103 _message: &[u8],
1104 _cert: &rustls::pki_types::CertificateDer<'_>,
1105 _dss: &rustls::DigitallySignedStruct,
1106 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1107 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1108 }
1109
1110 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1111 rustls::crypto::ring::default_provider()
1112 .signature_verification_algorithms
1113 .supported_schemes()
1114 }
1115}
1116
1117fn url_port(url: &Url) -> Option<u16> {
1118 if let Some(port) = url.port() {
1119 return Some(port);
1120 }
1121
1122 match url.scheme() {
1123 "http" => Some(80),
1124 "https" => Some(443),
1125 _ => None,
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use std::str::FromStr;
1132
1133 use anyhow::{bail, Result};
1134
1135 use super::*;
1136 use crate::dns::default_resolver;
1137
1138 #[tokio::test]
1139 async fn test_recv_detail_connect_error() -> Result<()> {
1140 let _guard = iroh_test::logging::setup();
1141
1142 let key = SecretKey::generate();
1143 let bad_url: Url = "https://bad.url".parse().unwrap();
1144 let dns_resolver = default_resolver();
1145
1146 let (_client, mut client_receiver) =
1147 ClientBuilder::new(bad_url).build(key.clone(), dns_resolver.clone());
1148
1149 if client_receiver.recv().await.and_then(|s| s.ok()).is_some() {
1152 bail!("expected client with bad relay node detail to return with an error");
1153 }
1154 Ok(())
1155 }
1156
1157 #[test]
1158 fn test_host_header_value() -> Result<()> {
1159 let _guard = iroh_test::logging::setup();
1160
1161 let cases = [
1162 (
1163 "https://euw1-1.relay.iroh.network.",
1164 "euw1-1.relay.iroh.network",
1165 ),
1166 ("http://localhost:8080", "localhost:8080"),
1167 ];
1168
1169 for (url, expected_host) in cases {
1170 let relay_url = RelayUrl::from_str(url)?;
1171 let host = host_header_value(relay_url)?;
1172 assert_eq!(host, expected_host);
1173 }
1174
1175 Ok(())
1176 }
1177}