deno_fetch/
lib.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3pub mod dns;
4mod fs_fetch_handler;
5mod proxy;
6#[cfg(test)]
7mod tests;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::cmp::min;
12use std::convert::From;
13use std::future;
14use std::future::Future;
15use std::net::IpAddr;
16use std::path::Path;
17#[cfg(not(windows))]
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24
25use bytes::Bytes;
26// Re-export data_url
27pub use data_url;
28use data_url::DataUrl;
29use deno_core::AsyncRefCell;
30use deno_core::AsyncResult;
31use deno_core::BufView;
32use deno_core::ByteString;
33use deno_core::CancelFuture;
34use deno_core::CancelHandle;
35use deno_core::CancelTryFuture;
36use deno_core::Canceled;
37use deno_core::JsBuffer;
38use deno_core::OpState;
39use deno_core::RcRef;
40use deno_core::Resource;
41use deno_core::ResourceId;
42use deno_core::futures::FutureExt;
43use deno_core::futures::Stream;
44use deno_core::futures::StreamExt;
45use deno_core::futures::TryFutureExt;
46use deno_core::futures::stream::Peekable;
47use deno_core::op2;
48use deno_core::url;
49use deno_core::url::Url;
50use deno_core::v8;
51use deno_error::JsErrorBox;
52pub use deno_fs::FsError;
53use deno_path_util::PathToUrlError;
54use deno_permissions::CheckedPath;
55use deno_permissions::OpenAccessKind;
56use deno_permissions::PermissionCheckError;
57use deno_tls::Proxy;
58use deno_tls::RootCertStoreProvider;
59use deno_tls::SocketUse;
60use deno_tls::TlsKey;
61use deno_tls::TlsKeys;
62use deno_tls::TlsKeysHolder;
63use deno_tls::rustls::RootCertStore;
64pub use fs_fetch_handler::FsFetchHandler;
65use http::Extensions;
66use http::HeaderMap;
67use http::Method;
68use http::Uri;
69use http::header::ACCEPT;
70use http::header::ACCEPT_ENCODING;
71use http::header::AUTHORIZATION;
72use http::header::CONTENT_LENGTH;
73use http::header::HOST;
74use http::header::HeaderName;
75use http::header::HeaderValue;
76use http::header::PROXY_AUTHORIZATION;
77use http::header::RANGE;
78use http::header::USER_AGENT;
79use http_body_util::BodyExt;
80use http_body_util::combinators::BoxBody;
81use hyper::body::Frame;
82use hyper_util::client::legacy::Builder as HyperClientBuilder;
83use hyper_util::client::legacy::connect::Connection;
84use hyper_util::client::legacy::connect::HttpConnector;
85use hyper_util::client::legacy::connect::HttpInfo;
86use hyper_util::rt::TokioExecutor;
87use hyper_util::rt::TokioIo;
88use hyper_util::rt::TokioTimer;
89pub use proxy::basic_auth;
90use serde::Deserialize;
91use serde::Serialize;
92use tower::BoxError;
93use tower::Service;
94use tower::ServiceExt;
95use tower::retry;
96use tower_http::decompression::Decompression;
97
98#[derive(Clone)]
99pub struct Options {
100  pub user_agent: String,
101  pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
102  pub proxy: Option<Proxy>,
103  /// A callback to customize HTTP client configuration.
104  ///
105  /// The settings applied with this hook may be overridden by the options
106  /// provided through `Deno.createHttpClient()` API. For instance, if the hook
107  /// calls [`hyper_util::client::legacy::Builder::pool_max_idle_per_host`] with
108  /// a value of 99, and a user calls `Deno.createHttpClient({ poolMaxIdlePerHost: 42 })`,
109  /// the value that will take effect is 42.
110  ///
111  /// For more info on what can be configured, see [`hyper_util::client::legacy::Builder`].
112  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
113  #[allow(clippy::type_complexity)]
114  pub request_builder_hook:
115    Option<fn(&mut http::Request<ReqBody>) -> Result<(), JsErrorBox>>,
116  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
117  pub client_cert_chain_and_key: TlsKeys,
118  pub file_fetch_handler: Rc<dyn FetchHandler>,
119  pub resolver: dns::Resolver,
120}
121
122impl Options {
123  pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
124    Ok(match &self.root_cert_store_provider {
125      Some(provider) => Some(provider.get_or_try_init()?.clone()),
126      None => None,
127    })
128  }
129}
130
131impl Default for Options {
132  fn default() -> Self {
133    Self {
134      user_agent: "".to_string(),
135      root_cert_store_provider: None,
136      proxy: None,
137      client_builder_hook: None,
138      request_builder_hook: None,
139      unsafely_ignore_certificate_errors: None,
140      client_cert_chain_and_key: TlsKeys::Null,
141      file_fetch_handler: Rc::new(DefaultFileFetchHandler),
142      resolver: dns::Resolver::default(),
143    }
144  }
145}
146
147deno_core::extension!(deno_fetch,
148  deps = [ deno_webidl, deno_web, deno_url, deno_console ],
149  parameters = [FP: FetchPermissions],
150  ops = [
151    op_fetch<FP>,
152    op_fetch_send,
153    op_utf8_to_byte_string,
154    op_fetch_custom_client<FP>,
155    op_fetch_promise_is_settled,
156  ],
157  esm = [
158    "20_headers.js",
159    "21_formdata.js",
160    "22_body.js",
161    "22_http_client.js",
162    "23_request.js",
163    "23_response.js",
164    "26_fetch.js",
165    "27_eventsource.js"
166  ],
167  options = {
168    options: Options,
169  },
170  state = |state, options| {
171    state.put::<Options>(options.options);
172  },
173);
174
175#[derive(Debug, thiserror::Error, deno_error::JsError)]
176pub enum FetchError {
177  #[class(inherit)]
178  #[error(transparent)]
179  Resource(#[from] deno_core::error::ResourceError),
180  #[class(inherit)]
181  #[error(transparent)]
182  Permission(#[from] PermissionCheckError),
183  #[class(type)]
184  #[error("NetworkError when attempting to fetch resource")]
185  NetworkError,
186  #[class(type)]
187  #[error("Fetching files only supports the GET method: received {0}")]
188  FsNotGet(Method),
189  #[class(inherit)]
190  #[error(transparent)]
191  PathToUrl(#[from] PathToUrlError),
192  #[class(type)]
193  #[error("Invalid URL {0}")]
194  InvalidUrl(Url),
195  #[class(type)]
196  #[error(transparent)]
197  InvalidHeaderName(#[from] http::header::InvalidHeaderName),
198  #[class(type)]
199  #[error(transparent)]
200  InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
201  #[class(type)]
202  #[error("{0:?}")]
203  DataUrl(data_url::DataUrlError),
204  #[class(type)]
205  #[error("{0:?}")]
206  Base64(data_url::forgiving_base64::InvalidBase64),
207  #[class(type)]
208  #[error("Blob for the given URL not found.")]
209  BlobNotFound,
210  #[class(type)]
211  #[error("Url scheme '{0}' not supported")]
212  SchemeNotSupported(String),
213  #[class(type)]
214  #[error("Request was cancelled")]
215  RequestCanceled,
216  #[class(generic)]
217  #[error(transparent)]
218  Http(#[from] http::Error),
219  #[class(inherit)]
220  #[error(transparent)]
221  ClientCreate(#[from] HttpClientCreateError),
222  #[class(inherit)]
223  #[error(transparent)]
224  Url(#[from] url::ParseError),
225  #[class(type)]
226  #[error(transparent)]
227  Method(#[from] http::method::InvalidMethod),
228  #[class(inherit)]
229  #[error(transparent)]
230  ClientSend(#[from] ClientSendError),
231  #[class(inherit)]
232  #[error(transparent)]
233  RequestBuilderHook(JsErrorBox),
234  #[class(inherit)]
235  #[error(transparent)]
236  Io(#[from] std::io::Error),
237  #[class(generic)]
238  #[error(transparent)]
239  Dns(hickory_resolver::ResolveError),
240  #[class(generic)]
241  #[error(transparent)]
242  PermissionCheck(PermissionCheckError),
243}
244
245impl From<deno_fs::FsError> for FetchError {
246  fn from(value: deno_fs::FsError) -> Self {
247    match value {
248      deno_fs::FsError::Io(_)
249      | deno_fs::FsError::FileBusy
250      | deno_fs::FsError::NotSupported => FetchError::NetworkError,
251      deno_fs::FsError::PermissionCheck(err) => {
252        FetchError::PermissionCheck(err)
253      }
254    }
255  }
256}
257
258pub type CancelableResponseFuture =
259  Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
260
261pub trait FetchHandler: dyn_clone::DynClone {
262  // Return the result of the fetch request consisting of a tuple of the
263  // cancelable response result, the optional fetch body resource and the
264  // optional cancel handle.
265  fn fetch_file(
266    &self,
267    state: &mut OpState,
268    url: &Url,
269  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
270}
271
272dyn_clone::clone_trait_object!(FetchHandler);
273
274/// A default implementation which will error for every request.
275#[derive(Clone)]
276pub struct DefaultFileFetchHandler;
277
278impl FetchHandler for DefaultFileFetchHandler {
279  fn fetch_file(
280    &self,
281    _state: &mut OpState,
282    _url: &Url,
283  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
284    let fut = async move { Ok(Err(FetchError::NetworkError)) };
285    (Box::pin(fut), None)
286  }
287}
288
289#[derive(Serialize)]
290#[serde(rename_all = "camelCase")]
291pub struct FetchReturn {
292  pub request_rid: ResourceId,
293  pub cancel_handle_rid: Option<ResourceId>,
294}
295
296pub fn get_or_create_client_from_state(
297  state: &mut OpState,
298) -> Result<Client, HttpClientCreateError> {
299  if let Some(client) = state.try_borrow::<Client>() {
300    Ok(client.clone())
301  } else {
302    let options = state.borrow::<Options>();
303    let client = create_client_from_options(options)?;
304    state.put::<Client>(client.clone());
305    Ok(client)
306  }
307}
308
309pub fn create_client_from_options(
310  options: &Options,
311) -> Result<Client, HttpClientCreateError> {
312  create_http_client(
313    &options.user_agent,
314    CreateHttpClientOptions {
315      root_cert_store: options
316        .root_cert_store()
317        .map_err(HttpClientCreateError::RootCertStore)?,
318      ca_certs: vec![],
319      proxy: options.proxy.clone(),
320      dns_resolver: options.resolver.clone(),
321      unsafely_ignore_certificate_errors: options
322        .unsafely_ignore_certificate_errors
323        .clone(),
324      client_cert_chain_and_key: options
325        .client_cert_chain_and_key
326        .clone()
327        .try_into()
328        .unwrap_or_default(),
329      pool_max_idle_per_host: None,
330      pool_idle_timeout: None,
331      http1: true,
332      http2: true,
333      local_address: None,
334      client_builder_hook: options.client_builder_hook,
335    },
336  )
337}
338
339#[allow(clippy::type_complexity)]
340pub struct ResourceToBodyAdapter(
341  Rc<dyn Resource>,
342  Option<Pin<Box<dyn Future<Output = Result<BufView, JsErrorBox>>>>>,
343);
344
345impl ResourceToBodyAdapter {
346  pub fn new(resource: Rc<dyn Resource>) -> Self {
347    let future = resource.clone().read(64 * 1024);
348    Self(resource, Some(future))
349  }
350}
351
352// SAFETY: we only use this on a single-threaded executor
353unsafe impl Send for ResourceToBodyAdapter {}
354// SAFETY: we only use this on a single-threaded executor
355unsafe impl Sync for ResourceToBodyAdapter {}
356
357impl Stream for ResourceToBodyAdapter {
358  type Item = Result<Bytes, JsErrorBox>;
359
360  fn poll_next(
361    self: Pin<&mut Self>,
362    cx: &mut Context<'_>,
363  ) -> Poll<Option<Self::Item>> {
364    let this = self.get_mut();
365    match this.1.take() {
366      Some(mut fut) => match fut.poll_unpin(cx) {
367        Poll::Pending => {
368          this.1 = Some(fut);
369          Poll::Pending
370        }
371        Poll::Ready(res) => match res {
372          Ok(buf) if buf.is_empty() => Poll::Ready(None),
373          Ok(buf) => {
374            this.1 = Some(this.0.clone().read(64 * 1024));
375            Poll::Ready(Some(Ok(buf.to_vec().into())))
376          }
377          Err(err) => Poll::Ready(Some(Err(err))),
378        },
379      },
380      _ => Poll::Ready(None),
381    }
382  }
383}
384
385impl hyper::body::Body for ResourceToBodyAdapter {
386  type Data = Bytes;
387  type Error = JsErrorBox;
388
389  fn poll_frame(
390    self: Pin<&mut Self>,
391    cx: &mut Context<'_>,
392  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
393    match self.poll_next(cx) {
394      Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
395      Poll::Ready(None) => Poll::Ready(None),
396      Poll::Pending => Poll::Pending,
397    }
398  }
399}
400
401impl Drop for ResourceToBodyAdapter {
402  fn drop(&mut self) {
403    self.0.clone().close()
404  }
405}
406
407pub trait FetchPermissions {
408  fn check_net(
409    &mut self,
410    host: &str,
411    port: u16,
412    api_name: &str,
413  ) -> Result<(), PermissionCheckError>;
414  fn check_net_url(
415    &mut self,
416    url: &Url,
417    api_name: &str,
418  ) -> Result<(), PermissionCheckError>;
419  #[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
420  fn check_open<'a>(
421    &mut self,
422    path: Cow<'a, Path>,
423    open_access: OpenAccessKind,
424    api_name: &str,
425  ) -> Result<CheckedPath<'a>, PermissionCheckError>;
426  fn check_net_vsock(
427    &mut self,
428    cid: u32,
429    port: u32,
430    api_name: &str,
431  ) -> Result<(), PermissionCheckError>;
432}
433
434impl FetchPermissions for deno_permissions::PermissionsContainer {
435  #[inline(always)]
436  fn check_net(
437    &mut self,
438    host: &str,
439    port: u16,
440    api_name: &str,
441  ) -> Result<(), PermissionCheckError> {
442    deno_permissions::PermissionsContainer::check_net(
443      self,
444      &(host, Some(port)),
445      api_name,
446    )
447  }
448
449  #[inline(always)]
450  fn check_net_url(
451    &mut self,
452    url: &Url,
453    api_name: &str,
454  ) -> Result<(), PermissionCheckError> {
455    deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
456  }
457
458  #[inline(always)]
459  fn check_open<'a>(
460    &mut self,
461    path: Cow<'a, Path>,
462    open_access: OpenAccessKind,
463    api_name: &str,
464  ) -> Result<CheckedPath<'a>, PermissionCheckError> {
465    deno_permissions::PermissionsContainer::check_open(
466      self,
467      path,
468      open_access,
469      Some(api_name),
470    )
471  }
472
473  #[inline(always)]
474  fn check_net_vsock(
475    &mut self,
476    cid: u32,
477    port: u32,
478    api_name: &str,
479  ) -> Result<(), PermissionCheckError> {
480    deno_permissions::PermissionsContainer::check_net_vsock(
481      self, cid, port, api_name,
482    )
483  }
484}
485
486#[op2(stack_trace)]
487#[serde]
488#[allow(clippy::too_many_arguments)]
489#[allow(clippy::large_enum_variant)]
490#[allow(clippy::result_large_err)]
491pub fn op_fetch<FP>(
492  state: &mut OpState,
493  #[serde] method: ByteString,
494  #[string] url: String,
495  #[serde] headers: Vec<(ByteString, ByteString)>,
496  #[smi] client_rid: Option<u32>,
497  has_body: bool,
498  #[buffer] data: Option<JsBuffer>,
499  #[smi] resource: Option<ResourceId>,
500) -> Result<FetchReturn, FetchError>
501where
502  FP: FetchPermissions + 'static,
503{
504  let (client, allow_host) = if let Some(rid) = client_rid {
505    let r = state.resource_table.get::<HttpClientResource>(rid)?;
506    (r.client.clone(), r.allow_host)
507  } else {
508    (get_or_create_client_from_state(state)?, false)
509  };
510
511  let method = Method::from_bytes(&method)?;
512  let mut url = Url::parse(&url)?;
513
514  // Check scheme before asking for net permission
515  let scheme = url.scheme();
516  let (request_rid, cancel_handle_rid) = match scheme {
517    "file" => {
518      if method != Method::GET {
519        return Err(FetchError::FsNotGet(method));
520      }
521      let Options {
522        file_fetch_handler, ..
523      } = state.borrow_mut::<Options>();
524      let file_fetch_handler = file_fetch_handler.clone();
525      let (future, maybe_cancel_handle) =
526        file_fetch_handler.fetch_file(state, &url);
527      let request_rid = state
528        .resource_table
529        .add(FetchRequestResource { future, url });
530      let maybe_cancel_handle_rid = maybe_cancel_handle
531        .map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
532
533      (request_rid, maybe_cancel_handle_rid)
534    }
535    "http" | "https" => {
536      let permissions = state.borrow_mut::<FP>();
537      permissions.check_net_url(&url, "fetch()")?;
538
539      let maybe_authority = extract_authority(&mut url);
540      let uri = url
541        .as_str()
542        .parse::<Uri>()
543        .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
544
545      let mut con_len = None;
546      let body = if has_body {
547        match (data, resource) {
548          (Some(data), _) => {
549            // If a body is passed, we use it, and don't return a body for streaming.
550            con_len = Some(data.len() as u64);
551
552            ReqBody::full(data.to_vec().into())
553          }
554          (_, Some(resource)) => {
555            let resource = state.resource_table.take_any(resource)?;
556            match resource.size_hint() {
557              (body_size, Some(n)) if body_size == n && body_size > 0 => {
558                con_len = Some(body_size);
559              }
560              _ => {}
561            }
562            ReqBody::streaming(ResourceToBodyAdapter::new(resource))
563          }
564          (None, None) => unreachable!(),
565        }
566      } else {
567        // POST and PUT requests should always have a 0 length content-length,
568        // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
569        if matches!(method, Method::POST | Method::PUT) {
570          con_len = Some(0);
571        }
572        ReqBody::empty()
573      };
574
575      let mut request = http::Request::new(body);
576      *request.method_mut() = method.clone();
577      *request.uri_mut() = uri.clone();
578
579      if let Some((username, password)) = maybe_authority {
580        request.headers_mut().insert(
581          AUTHORIZATION,
582          proxy::basic_auth(&username, password.as_deref()),
583        );
584      }
585      if let Some(len) = con_len {
586        request.headers_mut().insert(CONTENT_LENGTH, len.into());
587      }
588
589      for (key, value) in headers {
590        let name = HeaderName::from_bytes(&key)?;
591        let v = HeaderValue::from_bytes(&value)?;
592
593        if (name != HOST || allow_host) && name != CONTENT_LENGTH {
594          request.headers_mut().append(name, v);
595        }
596      }
597
598      if request.headers().contains_key(RANGE) {
599        // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18
600        // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`)
601        request
602          .headers_mut()
603          .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
604      }
605
606      let options = state.borrow::<Options>();
607      if let Some(request_builder_hook) = options.request_builder_hook {
608        request_builder_hook(&mut request)
609          .map_err(FetchError::RequestBuilderHook)?;
610      }
611
612      let cancel_handle = CancelHandle::new_rc();
613      let cancel_handle_ = cancel_handle.clone();
614
615      let fut = async move {
616        client
617          .send(request)
618          .map_err(Into::into)
619          .or_cancel(cancel_handle_)
620          .await
621      };
622
623      let request_rid = state.resource_table.add(FetchRequestResource {
624        future: Box::pin(fut),
625        url,
626      });
627
628      let cancel_handle_rid =
629        state.resource_table.add(FetchCancelHandle(cancel_handle));
630
631      (request_rid, Some(cancel_handle_rid))
632    }
633    "data" => {
634      let data_url =
635        DataUrl::process(url.as_str()).map_err(FetchError::DataUrl)?;
636
637      let (body, _) = data_url.decode_to_vec().map_err(FetchError::Base64)?;
638      let body = http_body_util::Full::new(body.into())
639        .map_err(|never| match never {})
640        .boxed();
641
642      let response = http::Response::builder()
643        .status(http::StatusCode::OK)
644        .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
645        .body(body)?;
646
647      let fut = async move { Ok(Ok(response)) };
648
649      let request_rid = state.resource_table.add(FetchRequestResource {
650        future: Box::pin(fut),
651        url,
652      });
653
654      (request_rid, None)
655    }
656    "blob" => {
657      // Blob URL resolution happens in the JS side of fetch. If we got here is
658      // because the URL isn't an object URL.
659      return Err(FetchError::BlobNotFound);
660    }
661    _ => return Err(FetchError::SchemeNotSupported(scheme.to_string())),
662  };
663
664  Ok(FetchReturn {
665    request_rid,
666    cancel_handle_rid,
667  })
668}
669
670#[derive(Default, Serialize)]
671#[serde(rename_all = "camelCase")]
672pub struct FetchResponse {
673  pub status: u16,
674  pub status_text: String,
675  pub headers: Vec<(ByteString, ByteString)>,
676  pub url: String,
677  pub response_rid: ResourceId,
678  pub content_length: Option<u64>,
679  /// This field is populated if some error occurred which needs to be
680  /// reconstructed in the JS side to set the error _cause_.
681  /// In the tuple, the first element is an error message and the second one is
682  /// an error cause.
683  pub error: Option<(String, String)>,
684}
685
686#[op2(async)]
687#[serde]
688pub async fn op_fetch_send(
689  state: Rc<RefCell<OpState>>,
690  #[smi] rid: ResourceId,
691) -> Result<FetchResponse, FetchError> {
692  let request = state
693    .borrow_mut()
694    .resource_table
695    .take::<FetchRequestResource>(rid)?;
696
697  let request = Rc::try_unwrap(request)
698    .ok()
699    .expect("multiple op_fetch_send ongoing");
700
701  let res = match request.future.await {
702    Ok(Ok(res)) => res,
703    Ok(Err(err)) => {
704      // We're going to try and rescue the error cause from a stream and return it from this fetch.
705      // If any error in the chain is a hyper body error, return that as a special result we can use to
706      // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
707      // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
708
709      if let FetchError::ClientSend(err_src) = &err
710        && let Some(client_err) = std::error::Error::source(&err_src.source)
711        && let Some(err_src) = client_err.downcast_ref::<hyper::Error>()
712        && let Some(err_src) = std::error::Error::source(err_src)
713      {
714        return Ok(FetchResponse {
715          error: Some((err.to_string(), err_src.to_string())),
716          ..Default::default()
717        });
718      }
719
720      return Err(err);
721    }
722    Err(_) => return Err(FetchError::RequestCanceled),
723  };
724
725  let status = res.status();
726  let url = request.url.into();
727  let mut res_headers = Vec::new();
728  for (key, val) in res.headers().iter() {
729    res_headers.push((key.as_str().into(), val.as_bytes().into()));
730  }
731
732  let content_length = hyper::body::Body::size_hint(res.body()).exact();
733
734  let response_rid = state
735    .borrow_mut()
736    .resource_table
737    .add(FetchResponseResource::new(res, content_length));
738
739  Ok(FetchResponse {
740    status: status.as_u16(),
741    status_text: status.canonical_reason().unwrap_or("").to_string(),
742    headers: res_headers,
743    url,
744    response_rid,
745    content_length,
746    error: None,
747  })
748}
749
750type CancelableResponseResult =
751  Result<Result<http::Response<ResBody>, FetchError>, Canceled>;
752
753pub struct FetchRequestResource {
754  pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
755  pub url: Url,
756}
757
758impl Resource for FetchRequestResource {
759  fn name(&self) -> Cow<'_, str> {
760    "fetchRequest".into()
761  }
762}
763
764pub struct FetchCancelHandle(pub Rc<CancelHandle>);
765
766impl Resource for FetchCancelHandle {
767  fn name(&self) -> Cow<'_, str> {
768    "fetchCancelHandle".into()
769  }
770
771  fn close(self: Rc<Self>) {
772    self.0.cancel()
773  }
774}
775
776type BytesStream =
777  Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
778
779pub enum FetchResponseReader {
780  Start(http::Response<ResBody>),
781  BodyReader(Peekable<BytesStream>),
782}
783
784impl Default for FetchResponseReader {
785  fn default() -> Self {
786    let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
787    Self::BodyReader(stream.peekable())
788  }
789}
790#[derive(Debug)]
791pub struct FetchResponseResource {
792  pub response_reader: AsyncRefCell<FetchResponseReader>,
793  pub cancel: CancelHandle,
794  pub size: Option<u64>,
795}
796
797impl FetchResponseResource {
798  pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
799    Self {
800      response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
801      cancel: CancelHandle::default(),
802      size,
803    }
804  }
805
806  pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
807    let reader = self.response_reader.into_inner();
808    match reader {
809      FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
810      _ => unreachable!(),
811    }
812  }
813}
814
815impl Resource for FetchResponseResource {
816  fn name(&self) -> Cow<'_, str> {
817    "fetchResponse".into()
818  }
819
820  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
821    Box::pin(async move {
822      let mut reader =
823        RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
824
825      let body = loop {
826        match &mut *reader {
827          FetchResponseReader::BodyReader(reader) => break reader,
828          FetchResponseReader::Start(_) => {}
829        }
830
831        match std::mem::take(&mut *reader) {
832          FetchResponseReader::Start(resp) => {
833            let stream: BytesStream = Box::pin(
834              resp
835                .into_body()
836                .into_data_stream()
837                .map(|r| r.map_err(std::io::Error::other)),
838            );
839            *reader = FetchResponseReader::BodyReader(stream.peekable());
840          }
841          FetchResponseReader::BodyReader(_) => unreachable!(),
842        }
843      };
844      let fut = async move {
845        let mut reader = Pin::new(body);
846        loop {
847          match reader.as_mut().peek_mut().await {
848            Some(Ok(chunk)) if !chunk.is_empty() => {
849              let len = min(limit, chunk.len());
850              let chunk = chunk.split_to(len);
851              break Ok(chunk.into());
852            }
853            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
854            // currently has a peeked value that can be synchronously returned
855            // from `next()`.
856            //
857            // The future returned from `next()` is always ready, so we can
858            // safely call `await` on it without creating a race condition.
859            Some(_) => match reader.as_mut().next().await.unwrap() {
860              Ok(chunk) => assert!(chunk.is_empty()),
861              Err(err) => break Err(JsErrorBox::type_error(err.to_string())),
862            },
863            None => break Ok(BufView::empty()),
864          }
865        }
866      };
867
868      let cancel_handle = RcRef::map(self, |r| &r.cancel);
869      fut
870        .try_or_cancel(cancel_handle)
871        .await
872        .map_err(JsErrorBox::from_err)
873    })
874  }
875
876  fn size_hint(&self) -> (u64, Option<u64>) {
877    (self.size.unwrap_or(0), self.size)
878  }
879
880  fn close(self: Rc<Self>) {
881    self.cancel.cancel()
882  }
883}
884
885pub struct HttpClientResource {
886  pub client: Client,
887  pub allow_host: bool,
888}
889
890impl Resource for HttpClientResource {
891  fn name(&self) -> Cow<'_, str> {
892    "httpClient".into()
893  }
894}
895
896impl HttpClientResource {
897  fn new(client: Client, allow_host: bool) -> Self {
898    Self { client, allow_host }
899  }
900}
901
902#[derive(Deserialize, Debug)]
903#[serde(rename_all = "camelCase")]
904pub struct CreateHttpClientArgs {
905  ca_certs: Vec<String>,
906  proxy: Option<Proxy>,
907  pool_max_idle_per_host: Option<usize>,
908  pool_idle_timeout: Option<serde_json::Value>,
909  #[serde(default = "default_true")]
910  http1: bool,
911  #[serde(default = "default_true")]
912  http2: bool,
913  #[serde(default)]
914  allow_host: bool,
915  local_address: Option<String>,
916}
917
918fn default_true() -> bool {
919  true
920}
921
922#[op2(stack_trace)]
923#[smi]
924#[allow(clippy::result_large_err)]
925pub fn op_fetch_custom_client<FP>(
926  state: &mut OpState,
927  #[serde] mut args: CreateHttpClientArgs,
928  #[cppgc] tls_keys: &TlsKeysHolder,
929) -> Result<ResourceId, FetchError>
930where
931  FP: FetchPermissions + 'static,
932{
933  if let Some(proxy) = &mut args.proxy {
934    let permissions = state.borrow_mut::<FP>();
935    match proxy {
936      Proxy::Http { url, .. } => {
937        let url = Url::parse(url)?;
938        permissions.check_net_url(&url, "Deno.createHttpClient()")?;
939      }
940      Proxy::Tcp { hostname, port } => {
941        permissions.check_net(hostname, *port, "Deno.createHttpClient()")?;
942      }
943      Proxy::Unix {
944        path: original_path,
945      } => {
946        let path = Path::new(original_path);
947        let resolved_path = permissions
948          .check_open(
949            Cow::Borrowed(path),
950            OpenAccessKind::ReadWriteNoFollow,
951            "Deno.createHttpClient()",
952          )?
953          .into_path();
954        if path != resolved_path {
955          *original_path = resolved_path.to_string_lossy().into_owned();
956        }
957      }
958      Proxy::Vsock { cid, port } => {
959        let permissions = state.borrow_mut::<FP>();
960        permissions.check_net_vsock(*cid, *port, "Deno.createHttpClient()")?;
961      }
962    }
963  }
964
965  let options = state.borrow::<Options>();
966  let ca_certs = args
967    .ca_certs
968    .into_iter()
969    .map(|cert| cert.into_bytes())
970    .collect::<Vec<_>>();
971
972  let client = create_http_client(
973    &options.user_agent,
974    CreateHttpClientOptions {
975      root_cert_store: options
976        .root_cert_store()
977        .map_err(HttpClientCreateError::RootCertStore)?,
978      ca_certs,
979      proxy: args.proxy,
980      dns_resolver: dns::Resolver::default(),
981      unsafely_ignore_certificate_errors: options
982        .unsafely_ignore_certificate_errors
983        .clone(),
984      client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
985      pool_max_idle_per_host: args.pool_max_idle_per_host,
986      pool_idle_timeout: args.pool_idle_timeout.and_then(
987        |timeout| match timeout {
988          serde_json::Value::Bool(true) => None,
989          serde_json::Value::Bool(false) => Some(None),
990          serde_json::Value::Number(specify) => {
991            Some(Some(specify.as_u64().unwrap_or_default()))
992          }
993          _ => Some(None),
994        },
995      ),
996      http1: args.http1,
997      http2: args.http2,
998      local_address: args.local_address,
999      client_builder_hook: options.client_builder_hook,
1000    },
1001  )?;
1002
1003  let rid = state
1004    .resource_table
1005    .add(HttpClientResource::new(client, args.allow_host));
1006  Ok(rid)
1007}
1008
1009#[derive(Debug, Clone)]
1010pub struct CreateHttpClientOptions {
1011  pub root_cert_store: Option<RootCertStore>,
1012  pub ca_certs: Vec<Vec<u8>>,
1013  pub proxy: Option<Proxy>,
1014  pub dns_resolver: dns::Resolver,
1015  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
1016  pub client_cert_chain_and_key: Option<TlsKey>,
1017  pub pool_max_idle_per_host: Option<usize>,
1018  pub pool_idle_timeout: Option<Option<u64>>,
1019  pub http1: bool,
1020  pub http2: bool,
1021  pub local_address: Option<String>,
1022  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
1023}
1024
1025impl Default for CreateHttpClientOptions {
1026  fn default() -> Self {
1027    CreateHttpClientOptions {
1028      root_cert_store: None,
1029      ca_certs: vec![],
1030      proxy: None,
1031      dns_resolver: dns::Resolver::default(),
1032      unsafely_ignore_certificate_errors: None,
1033      client_cert_chain_and_key: None,
1034      pool_max_idle_per_host: None,
1035      pool_idle_timeout: None,
1036      http1: true,
1037      http2: true,
1038      local_address: None,
1039      client_builder_hook: None,
1040    }
1041  }
1042}
1043
1044#[derive(Debug, thiserror::Error, deno_error::JsError)]
1045#[class(type)]
1046pub enum HttpClientCreateError {
1047  #[error(transparent)]
1048  Tls(deno_tls::TlsError),
1049  #[error("Illegal characters in User-Agent: received {0}")]
1050  InvalidUserAgent(String),
1051  #[error("Invalid address: {0}")]
1052  InvalidAddress(String),
1053  #[error("invalid proxy url")]
1054  InvalidProxyUrl,
1055  #[error(
1056    "Cannot create Http Client: either `http1` or `http2` needs to be set to true"
1057  )]
1058  HttpVersionSelectionInvalid,
1059  #[class(inherit)]
1060  #[error(transparent)]
1061  RootCertStore(JsErrorBox),
1062  #[error("Unix proxy is not supported on Windows")]
1063  UnixProxyNotSupportedOnWindows,
1064  #[error("Vsock proxy is not supported on this platform")]
1065  VsockProxyNotSupported,
1066}
1067
1068/// Create new instance of async Client. This client supports
1069/// proxies and doesn't follow redirects.
1070pub fn create_http_client(
1071  user_agent: &str,
1072  options: CreateHttpClientOptions,
1073) -> Result<Client, HttpClientCreateError> {
1074  let mut tls_config =
1075    deno_tls::create_client_config(deno_tls::TlsClientConfigOptions {
1076      root_cert_store: options.root_cert_store,
1077      ca_certs: options.ca_certs,
1078      unsafely_ignore_certificate_errors: options
1079        .unsafely_ignore_certificate_errors,
1080      unsafely_disable_hostname_verification: false,
1081      cert_chain_and_key: options.client_cert_chain_and_key.into(),
1082      socket_use: deno_tls::SocketUse::Http,
1083    })
1084    .map_err(HttpClientCreateError::Tls)?;
1085
1086  // Proxy TLS should not send ALPN
1087  tls_config.alpn_protocols.clear();
1088  let proxy_tls_config = Arc::from(tls_config.clone());
1089
1090  let mut alpn_protocols = vec![];
1091  if options.http2 {
1092    alpn_protocols.push("h2".into());
1093  }
1094  if options.http1 {
1095    alpn_protocols.push("http/1.1".into());
1096  }
1097  tls_config.alpn_protocols = alpn_protocols;
1098  let tls_config = Arc::from(tls_config);
1099
1100  let mut http_connector =
1101    HttpConnector::new_with_resolver(options.dns_resolver.clone());
1102  http_connector.enforce_http(false);
1103  if let Some(local_address) = options.local_address {
1104    let local_addr = local_address
1105      .parse::<IpAddr>()
1106      .map_err(|_| HttpClientCreateError::InvalidAddress(local_address))?;
1107    http_connector.set_local_address(Some(local_addr));
1108  }
1109
1110  let user_agent = user_agent.parse::<HeaderValue>().map_err(|_| {
1111    HttpClientCreateError::InvalidUserAgent(user_agent.to_string())
1112  })?;
1113
1114  let mut builder = HyperClientBuilder::new(TokioExecutor::new());
1115  builder.timer(TokioTimer::new());
1116  builder.pool_timer(TokioTimer::new());
1117
1118  if let Some(client_builder_hook) = options.client_builder_hook {
1119    builder = client_builder_hook(builder);
1120  }
1121
1122  let mut proxies = proxy::from_env();
1123  if let Some(proxy) = options.proxy {
1124    let intercept = match proxy {
1125      Proxy::Http { url, basic_auth } => {
1126        let target = proxy::Target::parse(&url)
1127          .ok_or_else(|| HttpClientCreateError::InvalidProxyUrl)?;
1128        let mut intercept = proxy::Intercept::all(target);
1129        if let Some(basic_auth) = &basic_auth {
1130          intercept.set_auth(&basic_auth.username, &basic_auth.password);
1131        }
1132        intercept
1133      }
1134      Proxy::Tcp {
1135        hostname: host,
1136        port,
1137      } => {
1138        let target = proxy::Target::new_tcp(host, port);
1139        proxy::Intercept::all(target)
1140      }
1141      #[cfg(not(windows))]
1142      Proxy::Unix { path } => {
1143        let target = proxy::Target::new_unix(PathBuf::from(path));
1144        proxy::Intercept::all(target)
1145      }
1146      #[cfg(windows)]
1147      Proxy::Unix { .. } => {
1148        return Err(HttpClientCreateError::UnixProxyNotSupportedOnWindows);
1149      }
1150      #[cfg(any(
1151        target_os = "android",
1152        target_os = "linux",
1153        target_os = "macos"
1154      ))]
1155      Proxy::Vsock { cid, port } => {
1156        let target = proxy::Target::new_vsock(cid, port);
1157        proxy::Intercept::all(target)
1158      }
1159      #[cfg(not(any(
1160        target_os = "android",
1161        target_os = "linux",
1162        target_os = "macos"
1163      )))]
1164      Proxy::Vsock { .. } => {
1165        return Err(HttpClientCreateError::VsockProxyNotSupported);
1166      }
1167    };
1168    proxies.prepend(intercept);
1169  }
1170  let proxies = Arc::new(proxies);
1171  let connector = proxy::ProxyConnector {
1172    http: http_connector,
1173    proxies,
1174    tls: tls_config,
1175    tls_proxy: proxy_tls_config,
1176    user_agent: Some(user_agent.clone()),
1177  };
1178
1179  if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
1180    builder.pool_max_idle_per_host(pool_max_idle_per_host);
1181  }
1182
1183  if let Some(pool_idle_timeout) = options.pool_idle_timeout {
1184    builder.pool_idle_timeout(
1185      pool_idle_timeout.map(std::time::Duration::from_millis),
1186    );
1187  }
1188
1189  match (options.http1, options.http2) {
1190    (true, false) => {} // noop, handled by ALPN above
1191    (false, true) => {
1192      builder.http2_only(true);
1193    }
1194    (true, true) => {}
1195    (false, false) => {
1196      return Err(HttpClientCreateError::HttpVersionSelectionInvalid);
1197    }
1198  }
1199
1200  let pooled_client = builder.build(connector.clone());
1201  let retry_client = retry::Retry::new(FetchRetry, pooled_client);
1202  let decompress = Decompression::new(retry_client).gzip(true).br(true);
1203
1204  Ok(Client {
1205    inner: decompress,
1206    connector,
1207    user_agent,
1208  })
1209}
1210
1211#[op2]
1212#[serde]
1213pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
1214  input.into()
1215}
1216
1217#[derive(Clone, Debug)]
1218pub struct Client {
1219  inner: Decompression<
1220    retry::Retry<
1221      FetchRetry,
1222      hyper_util::client::legacy::Client<Connector, ReqBody>,
1223    >,
1224  >,
1225  connector: Connector,
1226  user_agent: HeaderValue,
1227}
1228
1229#[derive(Debug, thiserror::Error, deno_error::JsError)]
1230pub enum ClientConnectError {
1231  #[class(type)]
1232  #[error("HTTP/1.1 not supported by this client")]
1233  Http1NotSupported,
1234  #[class(type)]
1235  #[error("HTTP/2 not supported by this client")]
1236  Http2NotSupported,
1237  #[class(generic)]
1238  #[error(transparent)]
1239  Connector(BoxError),
1240}
1241
1242impl Client {
1243  pub async fn connect(
1244    &self,
1245    uri: Uri,
1246    socket_use: SocketUse,
1247  ) -> Result<
1248    impl tokio::io::AsyncRead
1249    + tokio::io::AsyncWrite
1250    + Connection
1251    + Unpin
1252    + Send
1253    + 'static,
1254    ClientConnectError,
1255  > {
1256    let mut connector = match socket_use {
1257      SocketUse::Http1Only => {
1258        let Some(connector) = self.connector.clone().h1_only() else {
1259          return Err(ClientConnectError::Http1NotSupported);
1260        };
1261        connector
1262      }
1263      SocketUse::Http2Only => {
1264        let Some(connector) = self.connector.clone().h2_only() else {
1265          return Err(ClientConnectError::Http2NotSupported);
1266        };
1267        connector
1268      }
1269      _ => self.connector.clone(),
1270    };
1271    let connection = connector
1272      .call(uri)
1273      .await
1274      .map_err(ClientConnectError::Connector)?;
1275    Ok(TokioIo::new(connection))
1276  }
1277}
1278
1279type Connector = proxy::ProxyConnector<HttpConnector<dns::Resolver>>;
1280
1281// clippy is wrong here
1282#[allow(clippy::declare_interior_mutable_const)]
1283const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
1284
1285#[derive(Debug, deno_error::JsError)]
1286#[class(type)]
1287pub struct ClientSendError {
1288  uri: Uri,
1289  pub source: hyper_util::client::legacy::Error,
1290}
1291
1292impl ClientSendError {
1293  pub fn is_connect_error(&self) -> bool {
1294    self.source.is_connect()
1295  }
1296
1297  fn http_info(&self) -> Option<HttpInfo> {
1298    let mut exts = Extensions::new();
1299    self.source.connect_info()?.get_extras(&mut exts);
1300    exts.remove::<HttpInfo>()
1301  }
1302}
1303
1304impl std::fmt::Display for ClientSendError {
1305  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1306    // NOTE: we can use `std::error::Report` instead once it's stabilized.
1307    let detail = error_reporter::Report::new(&self.source);
1308
1309    match self.http_info() {
1310      Some(http_info) => {
1311        write!(
1312          f,
1313          "error sending request from {src} for {uri} ({dst}): {detail}",
1314          src = http_info.local_addr(),
1315          uri = self.uri,
1316          dst = http_info.remote_addr(),
1317          detail = detail,
1318        )
1319      }
1320      None => {
1321        write!(
1322          f,
1323          "error sending request for url ({uri}): {detail}",
1324          uri = self.uri,
1325          detail = detail,
1326        )
1327      }
1328    }
1329  }
1330}
1331
1332impl std::error::Error for ClientSendError {
1333  fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1334    Some(&self.source)
1335  }
1336}
1337
1338pub trait CommonRequest {
1339  fn uri(&self) -> &Uri;
1340  fn headers_mut(&mut self) -> &mut HeaderMap;
1341}
1342
1343impl CommonRequest for http::Request<ReqBody> {
1344  fn uri(&self) -> &Uri {
1345    self.uri()
1346  }
1347
1348  fn headers_mut(&mut self) -> &mut HeaderMap {
1349    self.headers_mut()
1350  }
1351}
1352
1353impl CommonRequest for http::request::Builder {
1354  fn uri(&self) -> &Uri {
1355    http::request::Builder::uri_ref(self).expect("uri not set")
1356  }
1357
1358  fn headers_mut(&mut self) -> &mut HeaderMap {
1359    http::request::Builder::headers_mut(self).expect("headers not set")
1360  }
1361}
1362
1363impl Client {
1364  /// Injects common headers like User-Agent and Proxy-Authorization.
1365  pub fn inject_common_headers(&self, req: &mut impl CommonRequest) {
1366    req
1367      .headers_mut()
1368      .entry(USER_AGENT)
1369      .or_insert_with(|| self.user_agent.clone());
1370
1371    if let Some(auth) = self.connector.proxies.http_forward_auth(req.uri()) {
1372      req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
1373    }
1374  }
1375
1376  pub async fn send(
1377    self,
1378    mut req: http::Request<ReqBody>,
1379  ) -> Result<http::Response<ResBody>, ClientSendError> {
1380    self.inject_common_headers(&mut req);
1381
1382    req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1383
1384    let uri = req.uri().clone();
1385
1386    let resp = self
1387      .inner
1388      .oneshot(req)
1389      .await
1390      .map_err(|e| ClientSendError { uri, source: e })?;
1391    Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1392  }
1393}
1394
1395// This is a custom enum to allow the retry policy to clone the variants that could be retried.
1396pub enum ReqBody {
1397  Full(http_body_util::Full<Bytes>),
1398  Empty(http_body_util::Empty<Bytes>),
1399  Streaming(BoxBody<Bytes, JsErrorBox>),
1400}
1401
1402pub type ResBody = BoxBody<Bytes, JsErrorBox>;
1403
1404impl ReqBody {
1405  pub fn full(bytes: Bytes) -> Self {
1406    ReqBody::Full(http_body_util::Full::new(bytes))
1407  }
1408
1409  pub fn empty() -> Self {
1410    ReqBody::Empty(http_body_util::Empty::new())
1411  }
1412
1413  pub fn streaming<B>(body: B) -> Self
1414  where
1415    B: hyper::body::Body<Data = Bytes, Error = JsErrorBox>
1416      + Send
1417      + Sync
1418      + 'static,
1419  {
1420    ReqBody::Streaming(BoxBody::new(body))
1421  }
1422}
1423
1424impl hyper::body::Body for ReqBody {
1425  type Data = Bytes;
1426  type Error = JsErrorBox;
1427
1428  fn poll_frame(
1429    mut self: Pin<&mut Self>,
1430    cx: &mut Context<'_>,
1431  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1432    match &mut *self {
1433      ReqBody::Full(b) => {
1434        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1435      }
1436      ReqBody::Empty(b) => {
1437        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1438      }
1439      ReqBody::Streaming(b) => Pin::new(b).poll_frame(cx),
1440    }
1441  }
1442
1443  fn is_end_stream(&self) -> bool {
1444    match self {
1445      ReqBody::Full(b) => b.is_end_stream(),
1446      ReqBody::Empty(b) => b.is_end_stream(),
1447      ReqBody::Streaming(b) => b.is_end_stream(),
1448    }
1449  }
1450
1451  fn size_hint(&self) -> hyper::body::SizeHint {
1452    match self {
1453      ReqBody::Full(b) => b.size_hint(),
1454      ReqBody::Empty(b) => b.size_hint(),
1455      ReqBody::Streaming(b) => b.size_hint(),
1456    }
1457  }
1458}
1459
1460/// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572
1461/// Check the request URL for a "username:password" type authority, and if
1462/// found, remove it from the URL and return it.
1463pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
1464  use percent_encoding::percent_decode;
1465
1466  if url.has_authority() {
1467    let username: String = percent_decode(url.username().as_bytes())
1468      .decode_utf8()
1469      .ok()?
1470      .into();
1471    let password = url.password().and_then(|pass| {
1472      percent_decode(pass.as_bytes())
1473        .decode_utf8()
1474        .ok()
1475        .map(String::from)
1476    });
1477    if !username.is_empty() || password.is_some() {
1478      url
1479        .set_username("")
1480        .expect("has_authority means set_username shouldn't fail");
1481      url
1482        .set_password(None)
1483        .expect("has_authority means set_password shouldn't fail");
1484      return Some((username, password));
1485    }
1486  }
1487
1488  None
1489}
1490
1491#[op2(fast)]
1492fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
1493  promise.state() != v8::PromiseState::Pending
1494}
1495
1496/// Deno.fetch's retry policy.
1497#[derive(Clone, Debug)]
1498struct FetchRetry;
1499
1500/// Marker extension that a request has been retried once.
1501#[derive(Clone, Debug)]
1502struct Retried;
1503
1504impl<ResBody, E>
1505  retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
1506  for FetchRetry
1507where
1508  E: std::error::Error + 'static,
1509{
1510  /// Don't delay retries.
1511  type Future = future::Ready<()>;
1512
1513  fn retry(
1514    &mut self,
1515    req: &mut http::Request<ReqBody>,
1516    result: &mut Result<http::Response<ResBody>, E>,
1517  ) -> Option<Self::Future> {
1518    if req.extensions().get::<Retried>().is_some() {
1519      // only retry once
1520      return None;
1521    }
1522
1523    match result {
1524      Ok(..) => {
1525        // never retry a Response
1526        None
1527      }
1528      Err(err) => {
1529        if is_error_retryable(&*err) {
1530          req.extensions_mut().insert(Retried);
1531          Some(future::ready(()))
1532        } else {
1533          None
1534        }
1535      }
1536    }
1537  }
1538
1539  fn clone_request(
1540    &mut self,
1541    req: &http::Request<ReqBody>,
1542  ) -> Option<http::Request<ReqBody>> {
1543    let body = match req.body() {
1544      ReqBody::Full(b) => ReqBody::Full(b.clone()),
1545      ReqBody::Empty(b) => ReqBody::Empty(*b),
1546      ReqBody::Streaming(..) => return None,
1547    };
1548
1549    let mut clone = http::Request::new(body);
1550    *clone.method_mut() = req.method().clone();
1551    *clone.uri_mut() = req.uri().clone();
1552    *clone.headers_mut() = req.headers().clone();
1553    *clone.extensions_mut() = req.extensions().clone();
1554    Some(clone)
1555  }
1556}
1557
1558fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
1559  // Note: hyper doesn't promise it will always be this h2 version. Keep up to date.
1560  if let Some(err) = find_source::<h2::Error>(err) {
1561    // They sent us a graceful shutdown, try with a new connection!
1562    if err.is_go_away()
1563      && err.is_remote()
1564      && err.reason() == Some(h2::Reason::NO_ERROR)
1565    {
1566      return true;
1567    }
1568
1569    // REFUSED_STREAM was sent from the server, which is safe to retry.
1570    // https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
1571    if err.is_reset()
1572      && err.is_remote()
1573      && err.reason() == Some(h2::Reason::REFUSED_STREAM)
1574    {
1575      return true;
1576    }
1577  }
1578
1579  false
1580}
1581
1582fn find_source<'a, E: std::error::Error + 'static>(
1583  err: &'a (dyn std::error::Error + 'static),
1584) -> Option<&'a E> {
1585  let mut err = Some(err);
1586  while let Some(src) = err {
1587    if let Some(found) = src.downcast_ref::<E>() {
1588      return Some(found);
1589    }
1590    err = src.source();
1591  }
1592  None
1593}