iroh_net/relay/
client.rs

1//! Based on tailscale/derp/derphttp/derphttp_client.go
2
3use std::{
4    collections::HashMap,
5    net::{IpAddr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use base64::{engine::general_purpose::URL_SAFE, Engine as _};
11use bytes::Bytes;
12use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage};
13use futures_lite::future::Boxed as BoxFuture;
14use futures_util::StreamExt;
15use http_body_util::Empty;
16use hyper::{
17    body::Incoming,
18    header::{HOST, UPGRADE},
19    upgrade::Parts,
20    Request,
21};
22use hyper_util::rt::TokioIo;
23use rand::Rng;
24use rustls::client::Resumption;
25use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream};
26use tokio::{
27    io::{AsyncRead, AsyncWrite},
28    net::TcpStream,
29    sync::{mpsc, oneshot},
30    task::JoinSet,
31    time::Instant,
32};
33use tokio_util::{
34    codec::{FramedRead, FramedWrite},
35    task::AbortOnDropHandle,
36};
37use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
38use url::Url;
39
40use crate::{
41    defaults::timeouts::relay::*,
42    dns::{DnsResolver, ResolverExt},
43    key::{NodeId, PublicKey, SecretKey},
44    relay::{
45        codec::DerpCodec,
46        http::{Protocol, RELAY_PATH},
47        RelayUrl,
48    },
49    util::chain,
50};
51
52pub(crate) mod conn;
53pub(crate) mod streams;
54
55/// Possible connection errors on the [`Client`]
56#[derive(Debug, thiserror::Error)]
57pub enum ClientError {
58    /// The client is closed
59    #[error("client is closed")]
60    Closed,
61    /// There no underlying relay [`super::client::Client`] client exists for this http relay [`Client`]
62    #[error("no relay client")]
63    NoClient,
64    /// There was an error sending a packet
65    #[error("error sending a packet")]
66    Send,
67    /// There was an error receiving a packet
68    #[error("error receiving a packet: {0:?}")]
69    Receive(anyhow::Error),
70    /// There was a connection timeout error
71    #[error("connect timeout")]
72    ConnectTimeout,
73    /// No relay nodes are available
74    #[error("Relay node is not available")]
75    RelayNodeNotAvail,
76    /// No relay nodes are available with that name
77    #[error("no nodes available for {0}")]
78    NoNodeForTarget(String),
79    /// The relay node specified only allows STUN requests
80    #[error("no relay nodes found for {0}, only are stun_only nodes")]
81    StunOnlyNodesFound(String),
82    /// There was an error dialing
83    #[error("dial error")]
84    DialIO(#[from] std::io::Error),
85    /// There was an error from the task doing the dialing
86    #[error("dial error")]
87    DialTask(#[from] tokio::task::JoinError),
88    /// Both IPv4 and IPv6 are disabled for this relay node
89    #[error("both IPv4 and IPv6 are explicitly disabled for this node")]
90    IPDisabled,
91    /// No local addresses exist
92    #[error("no local addr: {0}")]
93    NoLocalAddr(String),
94    /// There was http server [`hyper::Error`]
95    #[error("http connection error")]
96    Hyper(#[from] hyper::Error),
97    /// There was an http error [`http::Error`].
98    #[error("http error")]
99    Http(#[from] http::Error),
100    /// There was an unexpected status code
101    #[error("unexpected status code: expected {0}, got {1}")]
102    UnexpectedStatusCode(hyper::StatusCode, hyper::StatusCode),
103    /// The connection failed to upgrade
104    #[error("failed to upgrade connection: {0}")]
105    Upgrade(String),
106    /// The connection failed to proxy
107    #[error("failed to proxy connection: {0}")]
108    Proxy(String),
109    /// The relay [`super::client::Client`] failed to build
110    #[error("failed to build relay client: {0}")]
111    Build(String),
112    /// The ping request timed out
113    #[error("ping timeout")]
114    PingTimeout,
115    /// The ping request was aborted
116    #[error("ping aborted")]
117    PingAborted,
118    /// This [`Client`] cannot acknowledge pings
119    #[error("cannot acknowledge pings")]
120    CannotAckPings,
121    /// The given [`Url`] is invalid
122    #[error("invalid url: {0}")]
123    InvalidUrl(String),
124    /// There was an error with DNS resolution
125    #[error("dns: {0:?}")]
126    Dns(Option<anyhow::Error>),
127    /// There was a timeout resolving DNS.
128    #[error("dns timeout")]
129    DnsTimeout,
130    /// The inner actor is gone, likely means things are shutdown.
131    #[error("actor gone")]
132    ActorGone,
133    /// An error related to websockets, either errors with parsing ws messages or the handshake
134    #[error("websocket error: {0}")]
135    WebsocketError(#[from] tokio_tungstenite_wasm::Error),
136}
137
138/// An HTTP Relay client.
139///
140/// Cheaply clonable.
141#[derive(Clone, Debug)]
142pub struct Client {
143    inner: mpsc::Sender<ActorMessage>,
144    public_key: PublicKey,
145    #[allow(dead_code)]
146    recv_loop: Arc<AbortOnDropHandle<()>>,
147}
148
149#[derive(Debug)]
150enum ActorMessage {
151    Connect(oneshot::Sender<Result<Conn, ClientError>>),
152    NotePreferred(bool),
153    LocalAddr(oneshot::Sender<Result<Option<SocketAddr>, ClientError>>),
154    Ping(oneshot::Sender<Result<Duration, ClientError>>),
155    Pong([u8; 8], oneshot::Sender<Result<(), ClientError>>),
156    Send(PublicKey, Bytes, oneshot::Sender<Result<(), ClientError>>),
157    Close(oneshot::Sender<Result<(), ClientError>>),
158    CloseForReconnect(oneshot::Sender<Result<(), ClientError>>),
159    IsConnected(oneshot::Sender<Result<bool, ClientError>>),
160}
161
162/// Receiving end of a [`Client`].
163#[derive(Debug)]
164pub struct ClientReceiver {
165    msg_receiver: mpsc::Receiver<Result<ReceivedMessage, ClientError>>,
166}
167
168#[derive(derive_more::Debug)]
169struct Actor {
170    secret_key: SecretKey,
171    can_ack_pings: bool,
172    is_preferred: bool,
173    relay_conn: Option<(Conn, ConnReceiver)>,
174    is_closed: bool,
175    #[debug("address family selector callback")]
176    address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
177    url: RelayUrl,
178    protocol: Protocol,
179    #[debug("TlsConnector")]
180    tls_connector: tokio_rustls::TlsConnector,
181    pings: PingTracker,
182    ping_tasks: JoinSet<()>,
183    dns_resolver: DnsResolver,
184    proxy_url: Option<Url>,
185}
186
187#[derive(Default, Debug)]
188struct PingTracker(HashMap<[u8; 8], oneshot::Sender<()>>);
189
190impl PingTracker {
191    /// Note that we have sent a ping, and store the [`oneshot::Sender`] we
192    /// must notify when the pong returns
193    fn register(&mut self) -> ([u8; 8], oneshot::Receiver<()>) {
194        let data = rand::thread_rng().gen::<[u8; 8]>();
195        let (send, recv) = oneshot::channel();
196        self.0.insert(data, send);
197        (data, recv)
198    }
199
200    /// Remove the associated [`oneshot::Sender`] for `data` & return it.
201    ///
202    /// If there is no [`oneshot::Sender`] in the tracker, return `None`.
203    fn unregister(&mut self, data: [u8; 8], why: &'static str) -> Option<oneshot::Sender<()>> {
204        trace!("removing ping {}: {}", hex::encode(data), why);
205        self.0.remove(&data)
206    }
207}
208
209/// Build a Client.
210#[derive(derive_more::Debug)]
211pub struct ClientBuilder {
212    /// Default is false
213    can_ack_pings: bool,
214    /// Default is false
215    is_preferred: bool,
216    /// Default is None
217    #[debug("address family selector callback")]
218    address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
219    /// Default is false
220    is_prober: bool,
221    /// Expected PublicKey of the server
222    server_public_key: Option<PublicKey>,
223    /// Server url.
224    url: RelayUrl,
225    /// Relay protocol
226    protocol: Protocol,
227    /// Allow self-signed certificates from relay servers
228    #[cfg(any(test, feature = "test-utils"))]
229    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
230    insecure_skip_cert_verify: bool,
231    /// HTTP Proxy
232    proxy_url: Option<Url>,
233}
234
235impl ClientBuilder {
236    /// Create a new [`ClientBuilder`]
237    pub fn new(url: impl Into<RelayUrl>) -> Self {
238        ClientBuilder {
239            can_ack_pings: false,
240            is_preferred: false,
241            address_family_selector: None,
242            is_prober: false,
243            server_public_key: None,
244            url: url.into(),
245            protocol: Protocol::Relay,
246            #[cfg(any(test, feature = "test-utils"))]
247            insecure_skip_cert_verify: false,
248            proxy_url: None,
249        }
250    }
251
252    /// Sets the server url
253    pub fn server_url(mut self, url: impl Into<RelayUrl>) -> Self {
254        self.url = url.into();
255        self
256    }
257
258    /// Sets whether to connect to the relay via websockets or not.
259    /// Set to use non-websocket, normal relaying by default.
260    pub fn protocol(mut self, protocol: Protocol) -> Self {
261        self.protocol = protocol;
262        self
263    }
264
265    /// Returns if we should prefer ipv6
266    /// it replaces the relayhttp.AddressFamilySelector we pass
267    /// It provides the hint as to whether in an IPv4-vs-IPv6 race that
268    /// IPv4 should be held back a bit to give IPv6 a better-than-50/50
269    /// chance of winning. We only return true when we believe IPv6 will
270    /// work anyway, so we don't artificially delay the connection speed.
271    pub fn address_family_selector<S>(mut self, selector: S) -> Self
272    where
273        S: Fn() -> BoxFuture<bool> + Send + Sync + 'static,
274    {
275        self.address_family_selector = Some(Box::new(selector));
276        self
277    }
278
279    /// Enable this [`Client`] to acknowledge pings.
280    pub fn can_ack_pings(mut self, can: bool) -> Self {
281        self.can_ack_pings = can;
282        self
283    }
284
285    /// Indicate this client is the preferred way to communicate
286    /// to the peer with this client's [`PublicKey`]
287    pub fn is_preferred(mut self, is: bool) -> Self {
288        self.is_preferred = is;
289        self
290    }
291
292    /// Indicates this client is a prober
293    pub fn is_prober(mut self, is: bool) -> Self {
294        self.is_prober = is;
295        self
296    }
297
298    /// Skip the verification of the relay server's SSL certificates.
299    ///
300    /// May only be used in tests.
301    #[cfg(any(test, feature = "test-utils"))]
302    #[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
303    pub fn insecure_skip_cert_verify(mut self, skip: bool) -> Self {
304        self.insecure_skip_cert_verify = skip;
305        self
306    }
307
308    /// Set an explicit proxy url to proxy all HTTP(S) traffic through.
309    pub fn proxy_url(mut self, url: Url) -> Self {
310        self.proxy_url.replace(url);
311        self
312    }
313
314    /// Build the [`Client`]
315    pub fn build(self, key: SecretKey, dns_resolver: DnsResolver) -> (Client, ClientReceiver) {
316        // TODO: review TLS config
317        let roots = rustls::RootCertStore {
318            roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
319        };
320        let mut config = rustls::client::ClientConfig::builder_with_provider(Arc::new(
321            rustls::crypto::ring::default_provider(),
322        ))
323        .with_safe_default_protocol_versions()
324        .expect("protocols supported by ring")
325        .with_root_certificates(roots)
326        .with_no_client_auth();
327        #[cfg(any(test, feature = "test-utils"))]
328        if self.insecure_skip_cert_verify {
329            warn!("Insecure config: SSL certificates from relay servers will be trusted without verification");
330            config
331                .dangerous()
332                .set_certificate_verifier(Arc::new(NoCertVerifier));
333        }
334
335        config.resumption = Resumption::default();
336
337        let tls_connector: tokio_rustls::TlsConnector = Arc::new(config).into();
338        let public_key = key.public();
339
340        let inner = Actor {
341            secret_key: key,
342            can_ack_pings: self.can_ack_pings,
343            is_preferred: self.is_preferred,
344            relay_conn: None,
345            is_closed: false,
346            address_family_selector: self.address_family_selector,
347            pings: PingTracker::default(),
348            ping_tasks: Default::default(),
349            url: self.url,
350            protocol: self.protocol,
351            tls_connector,
352            dns_resolver,
353            proxy_url: self.proxy_url,
354        };
355
356        let (msg_sender, inbox) = mpsc::channel(64);
357        let (s, r) = mpsc::channel(64);
358        let recv_loop = tokio::task::spawn(
359            async move { inner.run(inbox, s).await }.instrument(info_span!("client")),
360        );
361
362        (
363            Client {
364                public_key,
365                inner: msg_sender,
366                recv_loop: Arc::new(AbortOnDropHandle::new(recv_loop)),
367            },
368            ClientReceiver { msg_receiver: r },
369        )
370    }
371
372    /// The expected [`PublicKey`] of the relay server we are connecting to.
373    pub fn server_public_key(mut self, server_public_key: PublicKey) -> Self {
374        self.server_public_key = Some(server_public_key);
375        self
376    }
377}
378
379impl ClientReceiver {
380    /// Reads a message from the server.
381    pub async fn recv(&mut self) -> Option<Result<ReceivedMessage, ClientError>> {
382        self.msg_receiver.recv().await
383    }
384}
385
386impl Client {
387    /// The public key for this client
388    pub fn public_key(&self) -> PublicKey {
389        self.public_key
390    }
391
392    async fn send_actor<F, T>(&self, msg_create: F) -> Result<T, ClientError>
393    where
394        F: FnOnce(oneshot::Sender<Result<T, ClientError>>) -> ActorMessage,
395    {
396        let (s, r) = oneshot::channel();
397        let msg = msg_create(s);
398        match self.inner.send(msg).await {
399            Ok(_) => {
400                let res = r.await.map_err(|_| ClientError::ActorGone)??;
401                Ok(res)
402            }
403            Err(_) => Err(ClientError::ActorGone),
404        }
405    }
406
407    /// Connects to a relay Server and returns the underlying relay connection.
408    ///
409    /// Returns [`ClientError::Closed`] if the [`Client`] is closed.
410    ///
411    /// If there is already an active relay connection, returns the already
412    /// connected [`crate::relay::RelayConn`].
413    pub async fn connect(&self) -> Result<Conn, ClientError> {
414        self.send_actor(ActorMessage::Connect).await
415    }
416
417    /// Let the server know that this client is the preferred client
418    pub async fn note_preferred(&self, is_preferred: bool) {
419        self.inner
420            .send(ActorMessage::NotePreferred(is_preferred))
421            .await
422            .ok();
423    }
424
425    /// Get the local addr of the connection. If there is no current underlying relay connection
426    /// or the [`Client`] is closed, returns `None`.
427    pub async fn local_addr(&self) -> Option<SocketAddr> {
428        self.send_actor(ActorMessage::LocalAddr)
429            .await
430            .ok()
431            .flatten()
432    }
433
434    /// Send a ping to the server. Return once we get an expected pong.
435    ///
436    /// There must be a task polling `recv_detail` to process the `pong` response.
437    pub async fn ping(&self) -> Result<Duration, ClientError> {
438        self.send_actor(ActorMessage::Ping).await
439    }
440
441    /// Send a pong back to the server.
442    ///
443    /// If there is no underlying active relay connection, it creates one before attempting to
444    /// send the pong message.
445    ///
446    /// If there is an error sending pong, it closes the underlying relay connection before
447    /// returning.
448    pub async fn send_pong(&self, data: [u8; 8]) -> Result<(), ClientError> {
449        self.send_actor(|s| ActorMessage::Pong(data, s)).await
450    }
451
452    /// Send a packet to the server.
453    ///
454    /// If there is no underlying active relay connection, it creates one before attempting to
455    /// send the message.
456    ///
457    /// If there is an error sending the packet, it closes the underlying relay connection before
458    /// returning.
459    pub async fn send(&self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
460        self.send_actor(|s| ActorMessage::Send(dst_key, b, s)).await
461    }
462
463    /// Close the http relay connection.
464    pub async fn close(self) -> Result<(), ClientError> {
465        self.send_actor(ActorMessage::Close).await
466    }
467
468    /// Disconnect the http relay connection.
469    pub async fn close_for_reconnect(&self) -> Result<(), ClientError> {
470        self.send_actor(ActorMessage::CloseForReconnect).await
471    }
472
473    /// Returns `true` if the underlying relay connection is established.
474    pub async fn is_connected(&self) -> Result<bool, ClientError> {
475        self.send_actor(ActorMessage::IsConnected).await
476    }
477}
478
479impl Actor {
480    async fn run(
481        mut self,
482        mut inbox: mpsc::Receiver<ActorMessage>,
483        msg_sender: mpsc::Sender<Result<ReceivedMessage, ClientError>>,
484    ) {
485        // Add an initial connection attempt.
486        if let Err(err) = self.connect("initial connect").await {
487            msg_sender.send(Err(err)).await.ok();
488        }
489
490        loop {
491            tokio::select! {
492                res = self.recv_detail() => {
493                    if let Ok(ReceivedMessage::Pong(ping)) = res {
494                        match self.pings.unregister(ping, "pong") {
495                            Some(chan) => {
496                                if chan.send(()).is_err() {
497                                    warn!("pong received for ping {ping:?}, but the receiving channel was closed");
498                                }
499                            }
500                            None => {
501                                warn!("pong received for ping {ping:?}, but not registered");
502                            }
503                        }
504                        continue;
505                    }
506                    msg_sender.send(res).await.ok();
507                }
508                Some(msg) = inbox.recv() => {
509                    match msg {
510                        ActorMessage::Connect(s) => {
511                            let res = self.connect("actor msg").await.map(|(client, _)| (client));
512                            s.send(res).ok();
513                        },
514                        ActorMessage::NotePreferred(is_preferred) => {
515                            self.note_preferred(is_preferred).await;
516                        },
517                        ActorMessage::LocalAddr(s) => {
518                            let res = self.local_addr();
519                            s.send(Ok(res)).ok();
520                        },
521                        ActorMessage::Ping(s) => {
522                            self.ping(s).await;
523                        },
524                        ActorMessage::Pong(data, s) => {
525                            let res = self.send_pong(data).await;
526                            s.send(res).ok();
527                        },
528                        ActorMessage::Send(key, data, s) => {
529                            let res = self.send(key, data).await;
530                            s.send(res).ok();
531                        },
532                        ActorMessage::Close(s) => {
533                            let res = self.close().await;
534                            s.send(Ok(res)).ok();
535                            // shutting down
536                            break;
537                        },
538                        ActorMessage::CloseForReconnect(s) => {
539                            let res = self.close_for_reconnect().await;
540                            s.send(Ok(res)).ok();
541                        },
542                        ActorMessage::IsConnected(s) => {
543                            let res = self.is_connected();
544                            s.send(Ok(res)).ok();
545                        },
546                    }
547                }
548                else => {
549                    // Shutting down
550                    self.close().await;
551                    break;
552                }
553            }
554        }
555    }
556
557    /// Returns a connection to the relay.
558    ///
559    /// If the client is currently connected, the existing connection is returned; otherwise,
560    /// a new connection is made.
561    ///
562    /// Returns:
563    /// - A clonable connection object which can send DISCO messages to the relay.
564    /// - A reference to a channel receiving DISCO messages from the relay.
565    async fn connect(
566        &mut self,
567        why: &'static str,
568    ) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> {
569        if self.is_closed {
570            return Err(ClientError::Closed);
571        }
572        let url = self.url.clone();
573        async move {
574            if self.relay_conn.is_none() {
575                trace!("no connection, trying to connect");
576                let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
577                    .await
578                    .map_err(|_| ClientError::ConnectTimeout)??;
579
580                self.relay_conn = Some((conn, receiver));
581            } else {
582                trace!("already had connection");
583            }
584            let (conn, receiver) = self
585                .relay_conn
586                .as_mut()
587                .map(|(c, r)| (c.clone(), r))
588                .expect("just checked");
589
590            Ok((conn, receiver))
591        }
592        .instrument(info_span!("connect", %url, %why))
593        .await
594    }
595
596    async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> {
597        let (reader, writer, local_addr) = match self.protocol {
598            Protocol::Websocket => {
599                let (reader, writer) = self.connect_ws().await?;
600                let local_addr = None;
601                (reader, writer, local_addr)
602            }
603            Protocol::Relay => {
604                let (reader, writer, local_addr) = self.connect_derp().await?;
605                (reader, writer, Some(local_addr))
606            }
607        };
608
609        let (conn, receiver) =
610            ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
611                .build()
612                .await
613                .map_err(|e| ClientError::Build(e.to_string()))?;
614
615        if self.is_preferred && conn.note_preferred(true).await.is_err() {
616            conn.close().await;
617            return Err(ClientError::Send);
618        }
619
620        event!(
621            target: "events.net.relay.connected",
622            Level::DEBUG,
623            home = self.is_preferred,
624            url = %self.url,
625        );
626
627        trace!("connect_0 done");
628        Ok((conn, receiver))
629    }
630
631    async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> {
632        let mut dial_url = (*self.url).clone();
633        dial_url.set_path(RELAY_PATH);
634        // The relay URL is exchanged with the http(s) scheme in tickets and similar.
635        // We need to use the ws:// or wss:// schemes when connecting with websockets, though.
636        dial_url
637            .set_scheme(if self.use_tls() { "wss" } else { "ws" })
638            .map_err(|()| ClientError::InvalidUrl(self.url.to_string()))?;
639
640        debug!(%dial_url, "Dialing relay by websocket");
641
642        let (writer, reader) = tokio_tungstenite_wasm::connect(dial_url).await?.split();
643
644        let reader = ConnReader::Ws(reader);
645        let writer = ConnWriter::Ws(writer);
646
647        Ok((reader, writer))
648    }
649
650    async fn connect_derp(&self) -> Result<(ConnReader, ConnWriter, SocketAddr), ClientError> {
651        let url = self.url.clone();
652        let tcp_stream = self.dial_url().await?;
653
654        let local_addr = tcp_stream
655            .local_addr()
656            .map_err(|e| ClientError::NoLocalAddr(e.to_string()))?;
657
658        debug!(server_addr = ?tcp_stream.peer_addr(), %local_addr, "TCP stream connected");
659
660        let response = if self.use_tls() {
661            debug!("Starting TLS handshake");
662            let hostname = self
663                .tls_servername()
664                .ok_or_else(|| ClientError::InvalidUrl("No tls servername".into()))?;
665            let hostname = hostname.to_owned();
666            let tls_stream = self.tls_connector.connect(hostname, tcp_stream).await?;
667            debug!("tls_connector connect success");
668            Self::start_upgrade(tls_stream, url).await?
669        } else {
670            debug!("Starting handshake");
671            Self::start_upgrade(tcp_stream, url).await?
672        };
673
674        if response.status() != hyper::StatusCode::SWITCHING_PROTOCOLS {
675            error!(
676                "expected status 101 SWITCHING_PROTOCOLS, got: {}",
677                response.status()
678            );
679            return Err(ClientError::UnexpectedStatusCode(
680                hyper::StatusCode::SWITCHING_PROTOCOLS,
681                response.status(),
682            ));
683        }
684
685        debug!("starting upgrade");
686        let upgraded = match hyper::upgrade::on(response).await {
687            Ok(upgraded) => upgraded,
688            Err(err) => {
689                warn!("upgrade failed: {:#}", err);
690                return Err(ClientError::Hyper(err));
691            }
692        };
693
694        debug!("connection upgraded");
695        let (reader, writer) =
696            downcast_upgrade(upgraded).map_err(|e| ClientError::Upgrade(e.to_string()))?;
697
698        let reader = ConnReader::Derp(FramedRead::new(reader, DerpCodec));
699        let writer = ConnWriter::Derp(FramedWrite::new(writer, DerpCodec));
700
701        Ok((reader, writer, local_addr))
702    }
703
704    /// Sends the HTTP upgrade request to the relay server.
705    async fn start_upgrade<T>(
706        io: T,
707        relay_url: RelayUrl,
708    ) -> Result<hyper::Response<Incoming>, ClientError>
709    where
710        T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
711    {
712        let host_header_value = host_header_value(relay_url)?;
713
714        let io = hyper_util::rt::TokioIo::new(io);
715        let (mut request_sender, connection) = hyper::client::conn::http1::Builder::new()
716            .handshake(io)
717            .await?;
718        tokio::spawn(
719            // This task drives the HTTP exchange, completes once connection is upgraded.
720            async move {
721                debug!("HTTP upgrade driver started");
722                if let Err(err) = connection.with_upgrades().await {
723                    error!("HTTP upgrade error: {err:#}");
724                }
725                debug!("HTTP upgrade driver finished");
726            }
727            .instrument(info_span!("http-driver")),
728        );
729        debug!("Sending upgrade request");
730        let req = Request::builder()
731            .uri(RELAY_PATH)
732            .header(UPGRADE, Protocol::Relay.upgrade_header())
733            // https://datatracker.ietf.org/doc/html/rfc2616#section-14.23
734            // > A client MUST include a Host header field in all HTTP/1.1 request messages.
735            // This header value helps reverse proxies identify how to forward requests.
736            .header(HOST, host_header_value)
737            .body(http_body_util::Empty::<hyper::body::Bytes>::new())?;
738        request_sender.send_request(req).await.map_err(From::from)
739    }
740
741    async fn note_preferred(&mut self, is_preferred: bool) {
742        let old = &mut self.is_preferred;
743        if *old == is_preferred {
744            return;
745        }
746        *old = is_preferred;
747
748        // only send the preference if we already have a connection
749        let res = {
750            if let Some((ref conn, _)) = self.relay_conn {
751                conn.note_preferred(is_preferred).await
752            } else {
753                return;
754            }
755        };
756        // need to do this outside the above closure because they rely on the same lock
757        // if there was an error sending, close the underlying relay connection
758        if res.is_err() {
759            self.close_for_reconnect().await;
760        }
761    }
762
763    fn local_addr(&self) -> Option<SocketAddr> {
764        if self.is_closed {
765            return None;
766        }
767        if let Some((ref conn, _)) = self.relay_conn {
768            conn.local_addr()
769        } else {
770            None
771        }
772    }
773
774    async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
775        let connect_res = self.connect("ping").await.map(|(c, _)| c);
776        let (ping, recv) = self.pings.register();
777        trace!("ping: {}", hex::encode(ping));
778
779        self.ping_tasks.spawn(async move {
780            let res = match connect_res {
781                Ok(conn) => {
782                    let start = Instant::now();
783                    if let Err(err) = conn.send_ping(ping).await {
784                        warn!("failed to send ping: {:?}", err);
785                        Err(ClientError::Send)
786                    } else {
787                        match tokio::time::timeout(PING_TIMEOUT, recv).await {
788                            Ok(Ok(())) => Ok(start.elapsed()),
789                            Err(_) => Err(ClientError::PingTimeout),
790                            Ok(Err(_)) => Err(ClientError::PingAborted),
791                        }
792                    }
793                }
794                Err(err) => Err(err),
795            };
796            s.send(res).ok();
797        });
798    }
799
800    async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> {
801        trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send");
802        let (conn, _) = self.connect("send").await?;
803        if conn.send(remote_node, payload).await.is_err() {
804            self.close_for_reconnect().await;
805            return Err(ClientError::Send);
806        }
807        Ok(())
808    }
809
810    async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
811        debug!("send_pong");
812        if self.can_ack_pings {
813            let (conn, _) = self.connect("send_pong").await?;
814            if conn.send_pong(data).await.is_err() {
815                self.close_for_reconnect().await;
816                return Err(ClientError::Send);
817            }
818            Ok(())
819        } else {
820            Err(ClientError::CannotAckPings)
821        }
822    }
823
824    async fn close(mut self) {
825        if !self.is_closed {
826            self.is_closed = true;
827            self.close_for_reconnect().await;
828        }
829    }
830
831    fn is_connected(&self) -> bool {
832        if self.is_closed {
833            return false;
834        }
835        self.relay_conn.is_some()
836    }
837
838    fn tls_servername(&self) -> Option<rustls::pki_types::ServerName> {
839        self.url
840            .host_str()
841            .and_then(|s| rustls::pki_types::ServerName::try_from(s).ok())
842    }
843
844    fn use_tls(&self) -> bool {
845        // only disable tls if we are explicitly dialing a http url
846        #[allow(clippy::match_like_matches_macro)]
847        match self.url.scheme() {
848            "http" => false,
849            "ws" => false,
850            _ => true,
851        }
852    }
853
854    async fn dial_url(&self) -> Result<ProxyStream, ClientError> {
855        if let Some(ref proxy) = self.proxy_url {
856            let stream = self.dial_url_proxy(proxy.clone()).await?;
857            Ok(ProxyStream::Proxied(stream))
858        } else {
859            let stream = self.dial_url_direct().await?;
860            Ok(ProxyStream::Raw(stream))
861        }
862    }
863
864    async fn dial_url_direct(&self) -> Result<TcpStream, ClientError> {
865        debug!(%self.url, "dial url");
866        let prefer_ipv6 = self.prefer_ipv6().await;
867        let dst_ip = resolve_host(&self.dns_resolver, &self.url, prefer_ipv6).await?;
868
869        let port = url_port(&self.url)
870            .ok_or_else(|| ClientError::InvalidUrl("missing url port".into()))?;
871        let addr = SocketAddr::new(dst_ip, port);
872
873        debug!("connecting to {}", addr);
874        let tcp_stream =
875            tokio::time::timeout(
876                DIAL_NODE_TIMEOUT,
877                async move { TcpStream::connect(addr).await },
878            )
879            .await
880            .map_err(|_| ClientError::ConnectTimeout)?
881            .map_err(ClientError::DialIO)?;
882
883        tcp_stream.set_nodelay(true)?;
884
885        Ok(tcp_stream)
886    }
887
888    async fn dial_url_proxy(
889        &self,
890        proxy_url: Url,
891    ) -> Result<chain::Chain<std::io::Cursor<Bytes>, MaybeTlsStream>, ClientError> {
892        debug!(%self.url, %proxy_url, "dial url via proxy");
893
894        // Resolve proxy DNS
895        let prefer_ipv6 = self.prefer_ipv6().await;
896        let proxy_ip = resolve_host(&self.dns_resolver, &proxy_url, prefer_ipv6).await?;
897
898        let proxy_port = url_port(&proxy_url)
899            .ok_or_else(|| ClientError::Proxy("missing proxy url port".into()))?;
900        let proxy_addr = SocketAddr::new(proxy_ip, proxy_port);
901
902        debug!(%proxy_addr, "connecting to proxy");
903
904        let tcp_stream = tokio::time::timeout(DIAL_NODE_TIMEOUT, async move {
905            TcpStream::connect(proxy_addr).await
906        })
907        .await
908        .map_err(|_| ClientError::ConnectTimeout)?
909        .map_err(ClientError::DialIO)?;
910
911        tcp_stream.set_nodelay(true)?;
912
913        // Setup TLS if necessary
914        let io = if proxy_url.scheme() == "http" {
915            MaybeTlsStream::Raw(tcp_stream)
916        } else {
917            let hostname = proxy_url
918                .host_str()
919                .and_then(|s| rustls::pki_types::ServerName::try_from(s.to_string()).ok())
920                .ok_or_else(|| ClientError::InvalidUrl("No tls servername for proxy url".into()))?;
921            let tls_stream = self.tls_connector.connect(hostname, tcp_stream).await?;
922            MaybeTlsStream::Tls(tls_stream)
923        };
924        let io = TokioIo::new(io);
925
926        let target_host = self
927            .url
928            .host_str()
929            .ok_or_else(|| ClientError::Proxy("missing proxy host".into()))?;
930
931        let port =
932            url_port(&self.url).ok_or_else(|| ClientError::Proxy("invalid target port".into()))?;
933
934        // Establish Proxy Tunnel
935        let mut req_builder = Request::builder()
936            .uri(format!("{}:{}", target_host, port))
937            .method("CONNECT")
938            .header("Host", target_host)
939            .header("Proxy-Connection", "Keep-Alive");
940        if !proxy_url.username().is_empty() {
941            // Passthrough authorization
942            // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Proxy-Authorization
943            debug!(
944                "setting proxy-authorization: username={}",
945                proxy_url.username()
946            );
947            let to_encode = format!(
948                "{}:{}",
949                proxy_url.username(),
950                proxy_url.password().unwrap_or_default()
951            );
952            let encoded = URL_SAFE.encode(to_encode);
953            req_builder = req_builder.header("Proxy-Authorization", format!("Basic {}", encoded));
954        }
955        let req = req_builder.body(Empty::<Bytes>::new())?;
956
957        debug!("Sending proxy request: {:?}", req);
958
959        let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
960        tokio::task::spawn(async move {
961            if let Err(err) = conn.with_upgrades().await {
962                error!("Proxy connection failed: {:?}", err);
963            }
964        });
965
966        let res = sender.send_request(req).await?;
967        if !res.status().is_success() {
968            return Err(ClientError::Proxy(format!(
969                "failed to connect to proxy: {}",
970                res.status(),
971            )));
972        }
973
974        let upgraded = hyper::upgrade::on(res).await?;
975        let Ok(Parts { io, read_buf, .. }) = upgraded.downcast::<TokioIo<MaybeTlsStream>>() else {
976            return Err(ClientError::Proxy("invalid upgrade".to_string()));
977        };
978
979        let res = chain::chain(std::io::Cursor::new(read_buf), io.into_inner());
980
981        Ok(res)
982    }
983
984    /// Reports whether IPv4 dials should be slightly
985    /// delayed to give IPv6 a better chance of winning dial races.
986    /// Implementations should only return true if IPv6 is expected
987    /// to succeed. (otherwise delaying IPv4 will delay the connection
988    /// overall)
989    async fn prefer_ipv6(&self) -> bool {
990        match self.address_family_selector {
991            Some(ref selector) => selector().await,
992            None => false,
993        }
994    }
995
996    async fn recv_detail(&mut self) -> Result<ReceivedMessage, ClientError> {
997        if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() {
998            trace!("recv_detail tick");
999            match conn_receiver.recv().await {
1000                Ok(msg) => {
1001                    return Ok(msg);
1002                }
1003                Err(e) => {
1004                    self.close_for_reconnect().await;
1005                    if self.is_closed {
1006                        return Err(ClientError::Closed);
1007                    }
1008                    // TODO(ramfox): more specific error?
1009                    return Err(ClientError::Receive(e));
1010                }
1011            }
1012        }
1013        std::future::pending().await
1014    }
1015
1016    /// Close the underlying relay connection. The next time the client takes some action that
1017    /// requires a connection, it will call `connect`.
1018    async fn close_for_reconnect(&mut self) {
1019        debug!("close for reconnect");
1020        if let Some((conn, _)) = self.relay_conn.take() {
1021            conn.close().await
1022        }
1023    }
1024}
1025
1026fn host_header_value(relay_url: RelayUrl) -> Result<String, ClientError> {
1027    // grab the host, turns e.g. https://example.com:8080/xyz -> example.com.
1028    let relay_url_host = relay_url
1029        .host_str()
1030        .ok_or_else(|| ClientError::InvalidUrl(relay_url.to_string()))?;
1031    // strip the trailing dot, if present: example.com. -> example.com
1032    let relay_url_host = relay_url_host.strip_suffix(".").unwrap_or(relay_url_host);
1033    // build the host header value (reserve up to 6 chars for the ":" and port digits):
1034    let mut host_header_value = String::with_capacity(relay_url_host.len() + 6);
1035    host_header_value += relay_url_host;
1036    if let Some(port) = relay_url.port() {
1037        host_header_value += ":";
1038        host_header_value += &port.to_string();
1039    }
1040    Ok(host_header_value)
1041}
1042
1043async fn resolve_host(
1044    resolver: &DnsResolver,
1045    url: &Url,
1046    prefer_ipv6: bool,
1047) -> Result<IpAddr, ClientError> {
1048    let host = url
1049        .host()
1050        .ok_or_else(|| ClientError::InvalidUrl("missing host".into()))?;
1051    match host {
1052        url::Host::Domain(domain) => {
1053            // Need to do a DNS lookup
1054            let mut addrs = resolver
1055                .lookup_ipv4_ipv6(domain, DNS_TIMEOUT)
1056                .await
1057                .map_err(|e| ClientError::Dns(Some(e)))?
1058                .peekable();
1059
1060            let found = if prefer_ipv6 {
1061                let first = addrs.peek().copied();
1062                addrs.find(IpAddr::is_ipv6).or(first)
1063            } else {
1064                addrs.next()
1065            };
1066
1067            found.ok_or_else(|| ClientError::Dns(None))
1068        }
1069        url::Host::Ipv4(ip) => Ok(IpAddr::V4(ip)),
1070        url::Host::Ipv6(ip) => Ok(IpAddr::V6(ip)),
1071    }
1072}
1073
1074/// Used to allow self signed certificates in tests
1075#[cfg(any(test, feature = "test-utils"))]
1076#[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))]
1077#[derive(Debug)]
1078struct NoCertVerifier;
1079
1080#[cfg(any(test, feature = "test-utils"))]
1081impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
1082    fn verify_server_cert(
1083        &self,
1084        _end_entity: &rustls::pki_types::CertificateDer,
1085        _intermediates: &[rustls::pki_types::CertificateDer],
1086        _server_name: &rustls::pki_types::ServerName,
1087        _ocsp_response: &[u8],
1088        _now: rustls::pki_types::UnixTime,
1089    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
1090        Ok(rustls::client::danger::ServerCertVerified::assertion())
1091    }
1092    fn verify_tls12_signature(
1093        &self,
1094        _message: &[u8],
1095        _cert: &rustls::pki_types::CertificateDer<'_>,
1096        _dss: &rustls::DigitallySignedStruct,
1097    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1098        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1099    }
1100
1101    fn verify_tls13_signature(
1102        &self,
1103        _message: &[u8],
1104        _cert: &rustls::pki_types::CertificateDer<'_>,
1105        _dss: &rustls::DigitallySignedStruct,
1106    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
1107        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1108    }
1109
1110    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1111        rustls::crypto::ring::default_provider()
1112            .signature_verification_algorithms
1113            .supported_schemes()
1114    }
1115}
1116
1117fn url_port(url: &Url) -> Option<u16> {
1118    if let Some(port) = url.port() {
1119        return Some(port);
1120    }
1121
1122    match url.scheme() {
1123        "http" => Some(80),
1124        "https" => Some(443),
1125        _ => None,
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use std::str::FromStr;
1132
1133    use anyhow::{bail, Result};
1134
1135    use super::*;
1136    use crate::dns::default_resolver;
1137
1138    #[tokio::test]
1139    async fn test_recv_detail_connect_error() -> Result<()> {
1140        let _guard = iroh_test::logging::setup();
1141
1142        let key = SecretKey::generate();
1143        let bad_url: Url = "https://bad.url".parse().unwrap();
1144        let dns_resolver = default_resolver();
1145
1146        let (_client, mut client_receiver) =
1147            ClientBuilder::new(bad_url).build(key.clone(), dns_resolver.clone());
1148
1149        // ensure that the client will bubble up any connection error & not
1150        // just loop ad infinitum attempting to connect
1151        if client_receiver.recv().await.and_then(|s| s.ok()).is_some() {
1152            bail!("expected client with bad relay node detail to return with an error");
1153        }
1154        Ok(())
1155    }
1156
1157    #[test]
1158    fn test_host_header_value() -> Result<()> {
1159        let _guard = iroh_test::logging::setup();
1160
1161        let cases = [
1162            (
1163                "https://euw1-1.relay.iroh.network.",
1164                "euw1-1.relay.iroh.network",
1165            ),
1166            ("http://localhost:8080", "localhost:8080"),
1167        ];
1168
1169        for (url, expected_host) in cases {
1170            let relay_url = RelayUrl::from_str(url)?;
1171            let host = host_header_value(relay_url)?;
1172            assert_eq!(host, expected_host);
1173        }
1174
1175        Ok(())
1176    }
1177}