iroh_relay/
client.rs

1//! Exposes [`Client`], which allows to establish connections to a relay server.
2//!
3//! Based on tailscale/derp/derphttp/derphttp_client.go
4
5use std::{
6    net::SocketAddr,
7    pin::Pin,
8    sync::Arc,
9    task::{self, Poll},
10};
11
12use conn::Conn;
13use iroh_base::{RelayUrl, SecretKey};
14use n0_error::{e, stack_error};
15use n0_future::{
16    Sink, Stream,
17    split::{SplitSink, SplitStream, split},
18    time,
19};
20#[cfg(any(test, feature = "test-utils"))]
21use tracing::warn;
22use tracing::{Level, debug, event, trace};
23use url::Url;
24
25pub use self::conn::{RecvError, SendError};
26#[cfg(not(wasm_browser))]
27use crate::dns::{DnsError, DnsResolver};
28use crate::{
29    KeyCache,
30    http::RELAY_PATH,
31    protos::{
32        handshake,
33        relay::{ClientToRelayMsg, RelayToClientMsg},
34    },
35};
36
37pub(crate) mod conn;
38#[cfg(not(wasm_browser))]
39pub(crate) mod streams;
40#[cfg(not(wasm_browser))]
41mod tls;
42#[cfg(not(wasm_browser))]
43mod util;
44
45/// Connection errors.
46///
47/// `ConnectError` contains `DialError`, errors that can occur while dialing the
48/// relay, as well as errors that occur while creating or maintaining a connection.
49#[stack_error(derive, add_meta, from_sources)]
50#[allow(missing_docs)]
51#[non_exhaustive]
52pub enum ConnectError {
53    #[error("Invalid URL for websocket: {url}")]
54    InvalidWebsocketUrl { url: Url },
55    #[error("Invalid relay URL: {url}")]
56    InvalidRelayUrl { url: Url },
57    #[error(transparent)]
58    Websocket {
59        #[cfg(not(wasm_browser))]
60        #[error(std_err)]
61        source: tokio_websockets::Error,
62        #[cfg(wasm_browser)]
63        #[error(std_err)]
64        source: ws_stream_wasm::WsErr,
65    },
66    #[error(transparent)]
67    Handshake {
68        #[error(std_err)]
69        source: handshake::Error,
70    },
71    #[error(transparent)]
72    Dial { source: DialError },
73    #[error("Unexpected status during upgrade: {code}")]
74    UnexpectedUpgradeStatus { code: hyper::StatusCode },
75    #[error("Failed to upgrade response")]
76    Upgrade {
77        #[error(std_err)]
78        source: hyper::Error,
79    },
80    #[error("Invalid TLS servername")]
81    InvalidTlsServername {},
82    #[error("No local address available")]
83    NoLocalAddr {},
84    #[error("tls connection failed")]
85    Tls {
86        #[error(std_err)]
87        source: std::io::Error,
88    },
89    #[cfg(wasm_browser)]
90    #[error("The relay protocol is not available in browsers")]
91    RelayProtoNotAvailable {},
92}
93
94/// Errors that can occur while dialing the relay server.
95#[stack_error(derive, add_meta, from_sources)]
96#[allow(missing_docs)]
97#[non_exhaustive]
98pub enum DialError {
99    #[error("Invalid target port")]
100    InvalidTargetPort {},
101    #[error(transparent)]
102    #[cfg(not(wasm_browser))]
103    Dns { source: DnsError },
104    #[error(transparent)]
105    Timeout {
106        #[error(std_err)]
107        source: time::Elapsed,
108    },
109    #[error(transparent)]
110    Io {
111        #[error(std_err)]
112        source: std::io::Error,
113    },
114    #[error("Invalid URL: {url}")]
115    InvalidUrl { url: Url },
116    #[error("Failed proxy connection: {status}")]
117    ProxyConnectInvalidStatus { status: hyper::StatusCode },
118    #[error("Invalid Proxy URL {proxy_url}")]
119    ProxyInvalidUrl { proxy_url: Url },
120    #[error("failed to establish proxy connection")]
121    ProxyConnect {
122        #[error(std_err)]
123        source: hyper::Error,
124    },
125    #[error("Invalid proxy TLS servername: {proxy_hostname}")]
126    ProxyInvalidTlsServername { proxy_hostname: String },
127    #[error("Invalid proxy target port")]
128    ProxyInvalidTargetPort {},
129}
130
131/// Build a Client.
132#[derive(derive_more::Debug, Clone)]
133pub struct ClientBuilder {
134    /// Default is None
135    #[debug("address family selector callback")]
136    address_family_selector: Option<Arc<dyn Fn() -> bool + Send + Sync>>,
137    /// Server url.
138    url: RelayUrl,
139    /// Allow self-signed certificates from relay servers
140    #[cfg(any(test, feature = "test-utils"))]
141    insecure_skip_cert_verify: bool,
142    /// HTTP Proxy
143    proxy_url: Option<Url>,
144    /// The secret key of this client.
145    secret_key: SecretKey,
146    /// The DNS resolver to use.
147    #[cfg(not(wasm_browser))]
148    dns_resolver: DnsResolver,
149    /// Cache for public keys of remote endpoints.
150    key_cache: KeyCache,
151}
152
153impl ClientBuilder {
154    /// Create a new [`ClientBuilder`]
155    pub fn new(
156        url: impl Into<RelayUrl>,
157        secret_key: SecretKey,
158        #[cfg(not(wasm_browser))] dns_resolver: DnsResolver,
159    ) -> Self {
160        ClientBuilder {
161            address_family_selector: None,
162            url: url.into(),
163
164            #[cfg(any(test, feature = "test-utils"))]
165            insecure_skip_cert_verify: false,
166
167            proxy_url: None,
168            secret_key,
169            #[cfg(not(wasm_browser))]
170            dns_resolver,
171            key_cache: KeyCache::new(128),
172        }
173    }
174
175    /// Returns if we should prefer ipv6
176    /// it replaces the relayhttp.AddressFamilySelector we pass
177    /// It provides the hint as to whether in an IPv4-vs-IPv6 race that
178    /// IPv4 should be held back a bit to give IPv6 a better-than-50/50
179    /// chance of winning. We only return true when we believe IPv6 will
180    /// work anyway, so we don't artificially delay the connection speed.
181    pub fn address_family_selector<S>(mut self, selector: S) -> Self
182    where
183        S: Fn() -> bool + Send + Sync + 'static,
184    {
185        self.address_family_selector = Some(Arc::new(selector));
186        self
187    }
188
189    /// Skip the verification of the relay server's SSL certificates.
190    ///
191    /// May only be used in tests.
192    #[cfg(any(test, feature = "test-utils"))]
193    pub fn insecure_skip_cert_verify(mut self, skip: bool) -> Self {
194        self.insecure_skip_cert_verify = skip;
195        self
196    }
197
198    /// Set an explicit proxy url to proxy all HTTP(S) traffic through.
199    pub fn proxy_url(mut self, url: Url) -> Self {
200        self.proxy_url.replace(url);
201        self
202    }
203
204    /// Set the capacity of the cache for public keys.
205    pub fn key_cache_capacity(mut self, capacity: usize) -> Self {
206        self.key_cache = KeyCache::new(capacity);
207        self
208    }
209
210    /// Establishes a new connection to the relay server.
211    #[cfg(not(wasm_browser))]
212    pub async fn connect(&self) -> Result<Client, ConnectError> {
213        use http::header::SEC_WEBSOCKET_PROTOCOL;
214        use tls::MaybeTlsStreamBuilder;
215
216        use crate::{
217            http::{CLIENT_AUTH_HEADER, RELAY_PROTOCOL_VERSION},
218            protos::{handshake::KeyMaterialClientAuth, relay::MAX_FRAME_SIZE},
219        };
220
221        let mut dial_url = (*self.url).clone();
222        dial_url.set_path(RELAY_PATH);
223        // The relay URL is exchanged with the http(s) scheme in tickets and similar.
224        // We need to use the ws:// or wss:// schemes when connecting with websockets, though.
225        dial_url
226            .set_scheme(match self.url.scheme() {
227                "http" => "ws",
228                "ws" => "ws",
229                _ => "wss",
230            })
231            .map_err(|_| {
232                e!(ConnectError::InvalidWebsocketUrl {
233                    url: dial_url.clone()
234                })
235            })?;
236
237        debug!(%dial_url, "Dialing relay by websocket");
238
239        #[allow(unused_mut)]
240        let mut builder = MaybeTlsStreamBuilder::new(dial_url.clone(), self.dns_resolver.clone())
241            .prefer_ipv6(self.prefer_ipv6())
242            .proxy_url(self.proxy_url.clone());
243
244        #[cfg(any(test, feature = "test-utils"))]
245        if self.insecure_skip_cert_verify {
246            builder = builder.insecure_skip_cert_verify(self.insecure_skip_cert_verify);
247        }
248
249        let stream = builder.connect().await?;
250        let local_addr = stream
251            .as_ref()
252            .local_addr()
253            .map_err(|_| e!(ConnectError::NoLocalAddr))?;
254        let mut builder = tokio_websockets::ClientBuilder::new()
255            .uri(dial_url.as_str())
256            .map_err(|_| {
257                e!(ConnectError::InvalidRelayUrl {
258                    url: dial_url.clone()
259                })
260            })?
261            .add_header(
262                SEC_WEBSOCKET_PROTOCOL,
263                http::HeaderValue::from_static(RELAY_PROTOCOL_VERSION),
264            )
265            .expect("valid header name and value")
266            .limits(tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)))
267            // We turn off automatic flushing after a threshold (the default would be after 8KB).
268            // This means we need to flush manually, which we do by calling `Sink::send_all` or
269            // `Sink::send` (which calls `Sink::flush`) in the `ActiveRelayActor`.
270            .config(tokio_websockets::Config::default().flush_threshold(usize::MAX));
271        if let Some(client_auth) = KeyMaterialClientAuth::new(&self.secret_key, &stream) {
272            debug!("Using TLS key export for relay client authentication");
273            builder = builder
274                .add_header(CLIENT_AUTH_HEADER, client_auth.into_header_value())
275                .expect(
276                    "impossible: CLIENT_AUTH_HEADER isn't a disallowed header value for websockets",
277                );
278        }
279        let (conn, response) = builder.connect_on(stream).await?;
280
281        n0_error::ensure!(
282            response.status() == hyper::StatusCode::SWITCHING_PROTOCOLS,
283            ConnectError::UnexpectedUpgradeStatus {
284                code: response.status()
285            }
286        );
287
288        let conn = Conn::new(conn, self.key_cache.clone(), &self.secret_key).await?;
289
290        event!(
291            target: "events.net.relay.connected",
292            Level::DEBUG,
293            url = %self.url,
294        );
295
296        trace!("connect done");
297
298        Ok(Client {
299            conn,
300            local_addr: Some(local_addr),
301        })
302    }
303
304    /// Reports whether IPv4 dials should be slightly
305    /// delayed to give IPv6 a better chance of winning dial races.
306    /// Implementations should only return true if IPv6 is expected
307    /// to succeed. (otherwise delaying IPv4 will delay the connection
308    /// overall)
309    #[cfg(not(wasm_browser))]
310    fn prefer_ipv6(&self) -> bool {
311        match self.address_family_selector {
312            Some(ref selector) => selector(),
313            None => false,
314        }
315    }
316
317    /// Establishes a new connection to the relay server.
318    #[cfg(wasm_browser)]
319    pub async fn connect(&self) -> Result<Client, ConnectError> {
320        use crate::http::RELAY_PROTOCOL_VERSION;
321
322        let mut dial_url = (*self.url).clone();
323        dial_url.set_path(RELAY_PATH);
324        // The relay URL is exchanged with the http(s) scheme in tickets and similar.
325        // We need to use the ws:// or wss:// schemes when connecting with websockets, though.
326        dial_url
327            .set_scheme(match self.url.scheme() {
328                "http" => "ws",
329                "ws" => "ws",
330                _ => "wss",
331            })
332            .map_err(|_| {
333                e!(ConnectError::InvalidWebsocketUrl {
334                    url: dial_url.clone()
335                })
336            })?;
337
338        debug!(%dial_url, "Dialing relay by websocket");
339
340        let (_, ws_stream) =
341            ws_stream_wasm::WsMeta::connect(dial_url.as_str(), Some(vec![RELAY_PROTOCOL_VERSION]))
342                .await?;
343        let conn = Conn::new(ws_stream, self.key_cache.clone(), &self.secret_key).await?;
344
345        event!(
346            target: "events.net.relay.connected",
347            Level::DEBUG,
348            url = %self.url,
349        );
350
351        trace!("connect done");
352
353        Ok(Client {
354            conn,
355            local_addr: None,
356        })
357    }
358}
359
360/// A relay client.
361#[derive(Debug)]
362pub struct Client {
363    conn: Conn,
364    local_addr: Option<SocketAddr>,
365}
366
367impl Client {
368    /// Splits the client into a sink and a stream.
369    pub fn split(self) -> (ClientStream, ClientSink) {
370        let (sink, stream) = split(self.conn);
371        (
372            ClientStream {
373                stream,
374                local_addr: self.local_addr,
375            },
376            ClientSink { sink },
377        )
378    }
379}
380
381impl Stream for Client {
382    type Item = Result<RelayToClientMsg, RecvError>;
383
384    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
385        Pin::new(&mut self.conn).poll_next(cx)
386    }
387}
388
389impl Sink<ClientToRelayMsg> for Client {
390    type Error = SendError;
391
392    fn poll_ready(
393        mut self: Pin<&mut Self>,
394        cx: &mut task::Context<'_>,
395    ) -> Poll<Result<(), Self::Error>> {
396        Pin::new(&mut self.conn).poll_ready(cx)
397    }
398
399    fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
400        Pin::new(&mut self.conn).start_send(item)
401    }
402
403    fn poll_flush(
404        mut self: Pin<&mut Self>,
405        cx: &mut task::Context<'_>,
406    ) -> Poll<Result<(), Self::Error>> {
407        Pin::new(&mut self.conn).poll_flush(cx)
408    }
409
410    fn poll_close(
411        mut self: Pin<&mut Self>,
412        cx: &mut task::Context<'_>,
413    ) -> Poll<Result<(), Self::Error>> {
414        Pin::new(&mut self.conn).poll_close(cx)
415    }
416}
417
418/// The send half of a relay client.
419#[derive(Debug)]
420pub struct ClientSink {
421    sink: SplitSink<Conn, ClientToRelayMsg>,
422}
423
424impl Sink<ClientToRelayMsg> for ClientSink {
425    type Error = SendError;
426
427    fn poll_ready(
428        mut self: Pin<&mut Self>,
429        cx: &mut task::Context<'_>,
430    ) -> Poll<Result<(), Self::Error>> {
431        Pin::new(&mut self.sink).poll_ready(cx)
432    }
433
434    fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
435        Pin::new(&mut self.sink).start_send(item)
436    }
437
438    fn poll_flush(
439        mut self: Pin<&mut Self>,
440        cx: &mut task::Context<'_>,
441    ) -> Poll<Result<(), Self::Error>> {
442        Pin::new(&mut self.sink).poll_flush(cx)
443    }
444
445    fn poll_close(
446        mut self: Pin<&mut Self>,
447        cx: &mut task::Context<'_>,
448    ) -> Poll<Result<(), Self::Error>> {
449        Pin::new(&mut self.sink).poll_close(cx)
450    }
451}
452
453/// The receive half of a relay client.
454#[derive(Debug)]
455pub struct ClientStream {
456    stream: SplitStream<Conn>,
457    local_addr: Option<SocketAddr>,
458}
459
460impl ClientStream {
461    /// Returns the local address of the client.
462    pub fn local_addr(&self) -> Option<SocketAddr> {
463        self.local_addr
464    }
465}
466
467impl Stream for ClientStream {
468    type Item = Result<RelayToClientMsg, RecvError>;
469
470    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
471        Pin::new(&mut self.stream).poll_next(cx)
472    }
473}
474
475#[cfg(any(test, feature = "test-utils"))]
476/// Creates a client config that trusts any servers without verifying their TLS certificate.
477///
478/// Should be used for testing local relay setups only.
479pub fn make_dangerous_client_config() -> rustls::ClientConfig {
480    warn!(
481        "Insecure config: SSL certificates from relay servers will be trusted without verification"
482    );
483    rustls::client::ClientConfig::builder_with_provider(Arc::new(
484        rustls::crypto::ring::default_provider(),
485    ))
486    .with_protocol_versions(&[&rustls::version::TLS13])
487    .expect("protocols supported by ring")
488    .dangerous()
489    .with_custom_certificate_verifier(Arc::new(NoCertVerifier))
490    .with_no_client_auth()
491}
492
493/// Used to allow self signed certificates in tests
494#[cfg(any(test, feature = "test-utils"))]
495#[derive(Debug)]
496struct NoCertVerifier;
497
498#[cfg(any(test, feature = "test-utils"))]
499impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
500    fn verify_server_cert(
501        &self,
502        _end_entity: &rustls::pki_types::CertificateDer,
503        _intermediates: &[rustls::pki_types::CertificateDer],
504        _server_name: &rustls::pki_types::ServerName,
505        _ocsp_response: &[u8],
506        _now: rustls::pki_types::UnixTime,
507    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
508        Ok(rustls::client::danger::ServerCertVerified::assertion())
509    }
510    fn verify_tls12_signature(
511        &self,
512        _message: &[u8],
513        _cert: &rustls::pki_types::CertificateDer<'_>,
514        _dss: &rustls::DigitallySignedStruct,
515    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
516        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
517    }
518
519    fn verify_tls13_signature(
520        &self,
521        _message: &[u8],
522        _cert: &rustls::pki_types::CertificateDer<'_>,
523        _dss: &rustls::DigitallySignedStruct,
524    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
525        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
526    }
527
528    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
529        rustls::crypto::ring::default_provider()
530            .signature_verification_algorithms
531            .supported_schemes()
532    }
533}