1use std::{
6 net::SocketAddr,
7 pin::Pin,
8 sync::Arc,
9 task::{self, Poll},
10};
11
12use conn::Conn;
13use iroh_base::{RelayUrl, SecretKey};
14use n0_error::{e, stack_error};
15use n0_future::{
16 Sink, Stream,
17 split::{SplitSink, SplitStream, split},
18 time,
19};
20#[cfg(any(test, feature = "test-utils"))]
21use tracing::warn;
22use tracing::{Level, debug, event, trace};
23use url::Url;
24
25pub use self::conn::{RecvError, SendError};
26#[cfg(not(wasm_browser))]
27use crate::dns::{DnsError, DnsResolver};
28use crate::{
29 KeyCache,
30 http::RELAY_PATH,
31 protos::{
32 handshake,
33 relay::{ClientToRelayMsg, RelayToClientMsg},
34 },
35};
36
37pub(crate) mod conn;
38#[cfg(not(wasm_browser))]
39pub(crate) mod streams;
40#[cfg(not(wasm_browser))]
41mod tls;
42#[cfg(not(wasm_browser))]
43mod util;
44
45#[stack_error(derive, add_meta, from_sources)]
50#[allow(missing_docs)]
51#[non_exhaustive]
52pub enum ConnectError {
53 #[error("Invalid URL for websocket: {url}")]
54 InvalidWebsocketUrl { url: Url },
55 #[error("Invalid relay URL: {url}")]
56 InvalidRelayUrl { url: Url },
57 #[error(transparent)]
58 Websocket {
59 #[cfg(not(wasm_browser))]
60 #[error(std_err)]
61 source: tokio_websockets::Error,
62 #[cfg(wasm_browser)]
63 #[error(std_err)]
64 source: ws_stream_wasm::WsErr,
65 },
66 #[error(transparent)]
67 Handshake {
68 #[error(std_err)]
69 source: handshake::Error,
70 },
71 #[error(transparent)]
72 Dial { source: DialError },
73 #[error("Unexpected status during upgrade: {code}")]
74 UnexpectedUpgradeStatus { code: hyper::StatusCode },
75 #[error("Failed to upgrade response")]
76 Upgrade {
77 #[error(std_err)]
78 source: hyper::Error,
79 },
80 #[error("Invalid TLS servername")]
81 InvalidTlsServername {},
82 #[error("No local address available")]
83 NoLocalAddr {},
84 #[error("tls connection failed")]
85 Tls {
86 #[error(std_err)]
87 source: std::io::Error,
88 },
89 #[cfg(wasm_browser)]
90 #[error("The relay protocol is not available in browsers")]
91 RelayProtoNotAvailable {},
92}
93
94#[stack_error(derive, add_meta, from_sources)]
96#[allow(missing_docs)]
97#[non_exhaustive]
98pub enum DialError {
99 #[error("Invalid target port")]
100 InvalidTargetPort {},
101 #[error(transparent)]
102 #[cfg(not(wasm_browser))]
103 Dns { source: DnsError },
104 #[error(transparent)]
105 Timeout {
106 #[error(std_err)]
107 source: time::Elapsed,
108 },
109 #[error(transparent)]
110 Io {
111 #[error(std_err)]
112 source: std::io::Error,
113 },
114 #[error("Invalid URL: {url}")]
115 InvalidUrl { url: Url },
116 #[error("Failed proxy connection: {status}")]
117 ProxyConnectInvalidStatus { status: hyper::StatusCode },
118 #[error("Invalid Proxy URL {proxy_url}")]
119 ProxyInvalidUrl { proxy_url: Url },
120 #[error("failed to establish proxy connection")]
121 ProxyConnect {
122 #[error(std_err)]
123 source: hyper::Error,
124 },
125 #[error("Invalid proxy TLS servername: {proxy_hostname}")]
126 ProxyInvalidTlsServername { proxy_hostname: String },
127 #[error("Invalid proxy target port")]
128 ProxyInvalidTargetPort {},
129}
130
131#[derive(derive_more::Debug, Clone)]
133pub struct ClientBuilder {
134 #[debug("address family selector callback")]
136 address_family_selector: Option<Arc<dyn Fn() -> bool + Send + Sync>>,
137 url: RelayUrl,
139 #[cfg(any(test, feature = "test-utils"))]
141 insecure_skip_cert_verify: bool,
142 proxy_url: Option<Url>,
144 secret_key: SecretKey,
146 #[cfg(not(wasm_browser))]
148 dns_resolver: DnsResolver,
149 key_cache: KeyCache,
151}
152
153impl ClientBuilder {
154 pub fn new(
156 url: impl Into<RelayUrl>,
157 secret_key: SecretKey,
158 #[cfg(not(wasm_browser))] dns_resolver: DnsResolver,
159 ) -> Self {
160 ClientBuilder {
161 address_family_selector: None,
162 url: url.into(),
163
164 #[cfg(any(test, feature = "test-utils"))]
165 insecure_skip_cert_verify: false,
166
167 proxy_url: None,
168 secret_key,
169 #[cfg(not(wasm_browser))]
170 dns_resolver,
171 key_cache: KeyCache::new(128),
172 }
173 }
174
175 pub fn address_family_selector<S>(mut self, selector: S) -> Self
182 where
183 S: Fn() -> bool + Send + Sync + 'static,
184 {
185 self.address_family_selector = Some(Arc::new(selector));
186 self
187 }
188
189 #[cfg(any(test, feature = "test-utils"))]
193 pub fn insecure_skip_cert_verify(mut self, skip: bool) -> Self {
194 self.insecure_skip_cert_verify = skip;
195 self
196 }
197
198 pub fn proxy_url(mut self, url: Url) -> Self {
200 self.proxy_url.replace(url);
201 self
202 }
203
204 pub fn key_cache_capacity(mut self, capacity: usize) -> Self {
206 self.key_cache = KeyCache::new(capacity);
207 self
208 }
209
210 #[cfg(not(wasm_browser))]
212 pub async fn connect(&self) -> Result<Client, ConnectError> {
213 use http::header::SEC_WEBSOCKET_PROTOCOL;
214 use tls::MaybeTlsStreamBuilder;
215
216 use crate::{
217 http::{CLIENT_AUTH_HEADER, RELAY_PROTOCOL_VERSION},
218 protos::{handshake::KeyMaterialClientAuth, relay::MAX_FRAME_SIZE},
219 };
220
221 let mut dial_url = (*self.url).clone();
222 dial_url.set_path(RELAY_PATH);
223 dial_url
226 .set_scheme(match self.url.scheme() {
227 "http" => "ws",
228 "ws" => "ws",
229 _ => "wss",
230 })
231 .map_err(|_| {
232 e!(ConnectError::InvalidWebsocketUrl {
233 url: dial_url.clone()
234 })
235 })?;
236
237 debug!(%dial_url, "Dialing relay by websocket");
238
239 #[allow(unused_mut)]
240 let mut builder = MaybeTlsStreamBuilder::new(dial_url.clone(), self.dns_resolver.clone())
241 .prefer_ipv6(self.prefer_ipv6())
242 .proxy_url(self.proxy_url.clone());
243
244 #[cfg(any(test, feature = "test-utils"))]
245 if self.insecure_skip_cert_verify {
246 builder = builder.insecure_skip_cert_verify(self.insecure_skip_cert_verify);
247 }
248
249 let stream = builder.connect().await?;
250 let local_addr = stream
251 .as_ref()
252 .local_addr()
253 .map_err(|_| e!(ConnectError::NoLocalAddr))?;
254 let mut builder = tokio_websockets::ClientBuilder::new()
255 .uri(dial_url.as_str())
256 .map_err(|_| {
257 e!(ConnectError::InvalidRelayUrl {
258 url: dial_url.clone()
259 })
260 })?
261 .add_header(
262 SEC_WEBSOCKET_PROTOCOL,
263 http::HeaderValue::from_static(RELAY_PROTOCOL_VERSION),
264 )
265 .expect("valid header name and value")
266 .limits(tokio_websockets::Limits::default().max_payload_len(Some(MAX_FRAME_SIZE)))
267 .config(tokio_websockets::Config::default().flush_threshold(usize::MAX));
271 if let Some(client_auth) = KeyMaterialClientAuth::new(&self.secret_key, &stream) {
272 debug!("Using TLS key export for relay client authentication");
273 builder = builder
274 .add_header(CLIENT_AUTH_HEADER, client_auth.into_header_value())
275 .expect(
276 "impossible: CLIENT_AUTH_HEADER isn't a disallowed header value for websockets",
277 );
278 }
279 let (conn, response) = builder.connect_on(stream).await?;
280
281 n0_error::ensure!(
282 response.status() == hyper::StatusCode::SWITCHING_PROTOCOLS,
283 ConnectError::UnexpectedUpgradeStatus {
284 code: response.status()
285 }
286 );
287
288 let conn = Conn::new(conn, self.key_cache.clone(), &self.secret_key).await?;
289
290 event!(
291 target: "events.net.relay.connected",
292 Level::DEBUG,
293 url = %self.url,
294 );
295
296 trace!("connect done");
297
298 Ok(Client {
299 conn,
300 local_addr: Some(local_addr),
301 })
302 }
303
304 #[cfg(not(wasm_browser))]
310 fn prefer_ipv6(&self) -> bool {
311 match self.address_family_selector {
312 Some(ref selector) => selector(),
313 None => false,
314 }
315 }
316
317 #[cfg(wasm_browser)]
319 pub async fn connect(&self) -> Result<Client, ConnectError> {
320 use crate::http::RELAY_PROTOCOL_VERSION;
321
322 let mut dial_url = (*self.url).clone();
323 dial_url.set_path(RELAY_PATH);
324 dial_url
327 .set_scheme(match self.url.scheme() {
328 "http" => "ws",
329 "ws" => "ws",
330 _ => "wss",
331 })
332 .map_err(|_| {
333 e!(ConnectError::InvalidWebsocketUrl {
334 url: dial_url.clone()
335 })
336 })?;
337
338 debug!(%dial_url, "Dialing relay by websocket");
339
340 let (_, ws_stream) =
341 ws_stream_wasm::WsMeta::connect(dial_url.as_str(), Some(vec![RELAY_PROTOCOL_VERSION]))
342 .await?;
343 let conn = Conn::new(ws_stream, self.key_cache.clone(), &self.secret_key).await?;
344
345 event!(
346 target: "events.net.relay.connected",
347 Level::DEBUG,
348 url = %self.url,
349 );
350
351 trace!("connect done");
352
353 Ok(Client {
354 conn,
355 local_addr: None,
356 })
357 }
358}
359
360#[derive(Debug)]
362pub struct Client {
363 conn: Conn,
364 local_addr: Option<SocketAddr>,
365}
366
367impl Client {
368 pub fn split(self) -> (ClientStream, ClientSink) {
370 let (sink, stream) = split(self.conn);
371 (
372 ClientStream {
373 stream,
374 local_addr: self.local_addr,
375 },
376 ClientSink { sink },
377 )
378 }
379}
380
381impl Stream for Client {
382 type Item = Result<RelayToClientMsg, RecvError>;
383
384 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
385 Pin::new(&mut self.conn).poll_next(cx)
386 }
387}
388
389impl Sink<ClientToRelayMsg> for Client {
390 type Error = SendError;
391
392 fn poll_ready(
393 mut self: Pin<&mut Self>,
394 cx: &mut task::Context<'_>,
395 ) -> Poll<Result<(), Self::Error>> {
396 Pin::new(&mut self.conn).poll_ready(cx)
397 }
398
399 fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
400 Pin::new(&mut self.conn).start_send(item)
401 }
402
403 fn poll_flush(
404 mut self: Pin<&mut Self>,
405 cx: &mut task::Context<'_>,
406 ) -> Poll<Result<(), Self::Error>> {
407 Pin::new(&mut self.conn).poll_flush(cx)
408 }
409
410 fn poll_close(
411 mut self: Pin<&mut Self>,
412 cx: &mut task::Context<'_>,
413 ) -> Poll<Result<(), Self::Error>> {
414 Pin::new(&mut self.conn).poll_close(cx)
415 }
416}
417
418#[derive(Debug)]
420pub struct ClientSink {
421 sink: SplitSink<Conn, ClientToRelayMsg>,
422}
423
424impl Sink<ClientToRelayMsg> for ClientSink {
425 type Error = SendError;
426
427 fn poll_ready(
428 mut self: Pin<&mut Self>,
429 cx: &mut task::Context<'_>,
430 ) -> Poll<Result<(), Self::Error>> {
431 Pin::new(&mut self.sink).poll_ready(cx)
432 }
433
434 fn start_send(mut self: Pin<&mut Self>, item: ClientToRelayMsg) -> Result<(), Self::Error> {
435 Pin::new(&mut self.sink).start_send(item)
436 }
437
438 fn poll_flush(
439 mut self: Pin<&mut Self>,
440 cx: &mut task::Context<'_>,
441 ) -> Poll<Result<(), Self::Error>> {
442 Pin::new(&mut self.sink).poll_flush(cx)
443 }
444
445 fn poll_close(
446 mut self: Pin<&mut Self>,
447 cx: &mut task::Context<'_>,
448 ) -> Poll<Result<(), Self::Error>> {
449 Pin::new(&mut self.sink).poll_close(cx)
450 }
451}
452
453#[derive(Debug)]
455pub struct ClientStream {
456 stream: SplitStream<Conn>,
457 local_addr: Option<SocketAddr>,
458}
459
460impl ClientStream {
461 pub fn local_addr(&self) -> Option<SocketAddr> {
463 self.local_addr
464 }
465}
466
467impl Stream for ClientStream {
468 type Item = Result<RelayToClientMsg, RecvError>;
469
470 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
471 Pin::new(&mut self.stream).poll_next(cx)
472 }
473}
474
475#[cfg(any(test, feature = "test-utils"))]
476pub fn make_dangerous_client_config() -> rustls::ClientConfig {
480 warn!(
481 "Insecure config: SSL certificates from relay servers will be trusted without verification"
482 );
483 rustls::client::ClientConfig::builder_with_provider(Arc::new(
484 rustls::crypto::ring::default_provider(),
485 ))
486 .with_protocol_versions(&[&rustls::version::TLS13])
487 .expect("protocols supported by ring")
488 .dangerous()
489 .with_custom_certificate_verifier(Arc::new(NoCertVerifier))
490 .with_no_client_auth()
491}
492
493#[cfg(any(test, feature = "test-utils"))]
495#[derive(Debug)]
496struct NoCertVerifier;
497
498#[cfg(any(test, feature = "test-utils"))]
499impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
500 fn verify_server_cert(
501 &self,
502 _end_entity: &rustls::pki_types::CertificateDer,
503 _intermediates: &[rustls::pki_types::CertificateDer],
504 _server_name: &rustls::pki_types::ServerName,
505 _ocsp_response: &[u8],
506 _now: rustls::pki_types::UnixTime,
507 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
508 Ok(rustls::client::danger::ServerCertVerified::assertion())
509 }
510 fn verify_tls12_signature(
511 &self,
512 _message: &[u8],
513 _cert: &rustls::pki_types::CertificateDer<'_>,
514 _dss: &rustls::DigitallySignedStruct,
515 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
516 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
517 }
518
519 fn verify_tls13_signature(
520 &self,
521 _message: &[u8],
522 _cert: &rustls::pki_types::CertificateDer<'_>,
523 _dss: &rustls::DigitallySignedStruct,
524 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
525 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
526 }
527
528 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
529 rustls::crypto::ring::default_provider()
530 .signature_verification_algorithms
531 .supported_schemes()
532 }
533}