deno_fetch/
lib.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3pub mod dns;
4mod fs_fetch_handler;
5mod proxy;
6#[cfg(test)]
7mod tests;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::cmp::min;
12use std::convert::From;
13use std::future;
14use std::future::Future;
15use std::path::Path;
16use std::pin::Pin;
17use std::rc::Rc;
18use std::sync::Arc;
19use std::task::Context;
20use std::task::Poll;
21
22use bytes::Bytes;
23// Re-export data_url
24pub use data_url;
25use data_url::DataUrl;
26use deno_core::futures::stream::Peekable;
27use deno_core::futures::FutureExt;
28use deno_core::futures::Stream;
29use deno_core::futures::StreamExt;
30use deno_core::futures::TryFutureExt;
31use deno_core::op2;
32use deno_core::url;
33use deno_core::url::Url;
34use deno_core::v8;
35use deno_core::AsyncRefCell;
36use deno_core::AsyncResult;
37use deno_core::BufView;
38use deno_core::ByteString;
39use deno_core::CancelFuture;
40use deno_core::CancelHandle;
41use deno_core::CancelTryFuture;
42use deno_core::Canceled;
43use deno_core::JsBuffer;
44use deno_core::OpState;
45use deno_core::RcRef;
46use deno_core::Resource;
47use deno_core::ResourceId;
48use deno_error::JsErrorBox;
49pub use deno_fs::FsError;
50use deno_path_util::PathToUrlError;
51use deno_permissions::PermissionCheckError;
52use deno_tls::rustls::RootCertStore;
53use deno_tls::Proxy;
54use deno_tls::RootCertStoreProvider;
55use deno_tls::TlsKey;
56use deno_tls::TlsKeys;
57use deno_tls::TlsKeysHolder;
58pub use fs_fetch_handler::FsFetchHandler;
59use http::header::HeaderName;
60use http::header::HeaderValue;
61use http::header::ACCEPT;
62use http::header::ACCEPT_ENCODING;
63use http::header::AUTHORIZATION;
64use http::header::CONTENT_LENGTH;
65use http::header::HOST;
66use http::header::PROXY_AUTHORIZATION;
67use http::header::RANGE;
68use http::header::USER_AGENT;
69use http::Extensions;
70use http::Method;
71use http::Uri;
72use http_body_util::combinators::BoxBody;
73use http_body_util::BodyExt;
74use hyper::body::Frame;
75use hyper_util::client::legacy::connect::HttpConnector;
76use hyper_util::client::legacy::connect::HttpInfo;
77use hyper_util::client::legacy::Builder as HyperClientBuilder;
78use hyper_util::rt::TokioExecutor;
79use hyper_util::rt::TokioTimer;
80pub use proxy::basic_auth;
81use serde::Deserialize;
82use serde::Serialize;
83use tower::retry;
84use tower::ServiceExt;
85use tower_http::decompression::Decompression;
86
87#[derive(Clone)]
88pub struct Options {
89  pub user_agent: String,
90  pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
91  pub proxy: Option<Proxy>,
92  /// A callback to customize HTTP client configuration.
93  ///
94  /// The settings applied with this hook may be overridden by the options
95  /// provided through `Deno.createHttpClient()` API. For instance, if the hook
96  /// calls [`hyper_util::client::legacy::Builder::pool_max_idle_per_host`] with
97  /// a value of 99, and a user calls `Deno.createHttpClient({ poolMaxIdlePerHost: 42 })`,
98  /// the value that will take effect is 42.
99  ///
100  /// For more info on what can be configured, see [`hyper_util::client::legacy::Builder`].
101  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
102  #[allow(clippy::type_complexity)]
103  pub request_builder_hook:
104    Option<fn(&mut http::Request<ReqBody>) -> Result<(), JsErrorBox>>,
105  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
106  pub client_cert_chain_and_key: TlsKeys,
107  pub file_fetch_handler: Rc<dyn FetchHandler>,
108  pub resolver: dns::Resolver,
109}
110
111impl Options {
112  pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
113    Ok(match &self.root_cert_store_provider {
114      Some(provider) => Some(provider.get_or_try_init()?.clone()),
115      None => None,
116    })
117  }
118}
119
120impl Default for Options {
121  fn default() -> Self {
122    Self {
123      user_agent: "".to_string(),
124      root_cert_store_provider: None,
125      proxy: None,
126      client_builder_hook: None,
127      request_builder_hook: None,
128      unsafely_ignore_certificate_errors: None,
129      client_cert_chain_and_key: TlsKeys::Null,
130      file_fetch_handler: Rc::new(DefaultFileFetchHandler),
131      resolver: dns::Resolver::default(),
132    }
133  }
134}
135
136deno_core::extension!(deno_fetch,
137  deps = [ deno_webidl, deno_web, deno_url, deno_console ],
138  parameters = [FP: FetchPermissions],
139  ops = [
140    op_fetch<FP>,
141    op_fetch_send,
142    op_utf8_to_byte_string,
143    op_fetch_custom_client<FP>,
144    op_fetch_promise_is_settled,
145  ],
146  esm = [
147    "20_headers.js",
148    "21_formdata.js",
149    "22_body.js",
150    "22_http_client.js",
151    "23_request.js",
152    "23_response.js",
153    "26_fetch.js",
154    "27_eventsource.js"
155  ],
156  options = {
157    options: Options,
158  },
159  state = |state, options| {
160    state.put::<Options>(options.options);
161  },
162);
163
164#[derive(Debug, thiserror::Error, deno_error::JsError)]
165pub enum FetchError {
166  #[class(inherit)]
167  #[error(transparent)]
168  Resource(#[from] deno_core::error::ResourceError),
169  #[class(inherit)]
170  #[error(transparent)]
171  Permission(#[from] PermissionCheckError),
172  #[class(type)]
173  #[error("NetworkError when attempting to fetch resource")]
174  NetworkError,
175  #[class(type)]
176  #[error("Fetching files only supports the GET method: received {0}")]
177  FsNotGet(Method),
178  #[class(inherit)]
179  #[error(transparent)]
180  PathToUrl(#[from] PathToUrlError),
181  #[class(type)]
182  #[error("Invalid URL {0}")]
183  InvalidUrl(Url),
184  #[class(type)]
185  #[error(transparent)]
186  InvalidHeaderName(#[from] http::header::InvalidHeaderName),
187  #[class(type)]
188  #[error(transparent)]
189  InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
190  #[class(type)]
191  #[error("{0:?}")]
192  DataUrl(data_url::DataUrlError),
193  #[class(type)]
194  #[error("{0:?}")]
195  Base64(data_url::forgiving_base64::InvalidBase64),
196  #[class(type)]
197  #[error("Blob for the given URL not found.")]
198  BlobNotFound,
199  #[class(type)]
200  #[error("Url scheme '{0}' not supported")]
201  SchemeNotSupported(String),
202  #[class(type)]
203  #[error("Request was cancelled")]
204  RequestCanceled,
205  #[class(generic)]
206  #[error(transparent)]
207  Http(#[from] http::Error),
208  #[class(inherit)]
209  #[error(transparent)]
210  ClientCreate(#[from] HttpClientCreateError),
211  #[class(inherit)]
212  #[error(transparent)]
213  Url(#[from] url::ParseError),
214  #[class(type)]
215  #[error(transparent)]
216  Method(#[from] http::method::InvalidMethod),
217  #[class(inherit)]
218  #[error(transparent)]
219  ClientSend(#[from] ClientSendError),
220  #[class(inherit)]
221  #[error(transparent)]
222  RequestBuilderHook(JsErrorBox),
223  #[class(inherit)]
224  #[error(transparent)]
225  Io(#[from] std::io::Error),
226  #[class(generic)]
227  #[error(transparent)]
228  Dns(hickory_resolver::ResolveError),
229  #[class("NotCapable")]
230  #[error("requires {0} access")]
231  NotCapable(&'static str),
232}
233
234impl From<deno_fs::FsError> for FetchError {
235  fn from(value: deno_fs::FsError) -> Self {
236    match value {
237      deno_fs::FsError::Io(_)
238      | deno_fs::FsError::FileBusy
239      | deno_fs::FsError::NotSupported => FetchError::NetworkError,
240      deno_fs::FsError::NotCapable(err) => FetchError::NotCapable(err),
241    }
242  }
243}
244
245pub type CancelableResponseFuture =
246  Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
247
248pub trait FetchHandler: dyn_clone::DynClone {
249  // Return the result of the fetch request consisting of a tuple of the
250  // cancelable response result, the optional fetch body resource and the
251  // optional cancel handle.
252  fn fetch_file(
253    &self,
254    state: &mut OpState,
255    url: &Url,
256  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
257}
258
259dyn_clone::clone_trait_object!(FetchHandler);
260
261/// A default implementation which will error for every request.
262#[derive(Clone)]
263pub struct DefaultFileFetchHandler;
264
265impl FetchHandler for DefaultFileFetchHandler {
266  fn fetch_file(
267    &self,
268    _state: &mut OpState,
269    _url: &Url,
270  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
271    let fut = async move { Ok(Err(FetchError::NetworkError)) };
272    (Box::pin(fut), None)
273  }
274}
275
276#[derive(Serialize)]
277#[serde(rename_all = "camelCase")]
278pub struct FetchReturn {
279  pub request_rid: ResourceId,
280  pub cancel_handle_rid: Option<ResourceId>,
281}
282
283pub fn get_or_create_client_from_state(
284  state: &mut OpState,
285) -> Result<Client, HttpClientCreateError> {
286  if let Some(client) = state.try_borrow::<Client>() {
287    Ok(client.clone())
288  } else {
289    let options = state.borrow::<Options>();
290    let client = create_client_from_options(options)?;
291    state.put::<Client>(client.clone());
292    Ok(client)
293  }
294}
295
296pub fn create_client_from_options(
297  options: &Options,
298) -> Result<Client, HttpClientCreateError> {
299  create_http_client(
300    &options.user_agent,
301    CreateHttpClientOptions {
302      root_cert_store: options
303        .root_cert_store()
304        .map_err(HttpClientCreateError::RootCertStore)?,
305      ca_certs: vec![],
306      proxy: options.proxy.clone(),
307      dns_resolver: options.resolver.clone(),
308      unsafely_ignore_certificate_errors: options
309        .unsafely_ignore_certificate_errors
310        .clone(),
311      client_cert_chain_and_key: options
312        .client_cert_chain_and_key
313        .clone()
314        .try_into()
315        .unwrap_or_default(),
316      pool_max_idle_per_host: None,
317      pool_idle_timeout: None,
318      http1: true,
319      http2: true,
320      client_builder_hook: options.client_builder_hook,
321    },
322  )
323}
324
325#[allow(clippy::type_complexity)]
326pub struct ResourceToBodyAdapter(
327  Rc<dyn Resource>,
328  Option<Pin<Box<dyn Future<Output = Result<BufView, JsErrorBox>>>>>,
329);
330
331impl ResourceToBodyAdapter {
332  pub fn new(resource: Rc<dyn Resource>) -> Self {
333    let future = resource.clone().read(64 * 1024);
334    Self(resource, Some(future))
335  }
336}
337
338// SAFETY: we only use this on a single-threaded executor
339unsafe impl Send for ResourceToBodyAdapter {}
340// SAFETY: we only use this on a single-threaded executor
341unsafe impl Sync for ResourceToBodyAdapter {}
342
343impl Stream for ResourceToBodyAdapter {
344  type Item = Result<Bytes, JsErrorBox>;
345
346  fn poll_next(
347    self: Pin<&mut Self>,
348    cx: &mut Context<'_>,
349  ) -> Poll<Option<Self::Item>> {
350    let this = self.get_mut();
351    if let Some(mut fut) = this.1.take() {
352      match fut.poll_unpin(cx) {
353        Poll::Pending => {
354          this.1 = Some(fut);
355          Poll::Pending
356        }
357        Poll::Ready(res) => match res {
358          Ok(buf) if buf.is_empty() => Poll::Ready(None),
359          Ok(buf) => {
360            this.1 = Some(this.0.clone().read(64 * 1024));
361            Poll::Ready(Some(Ok(buf.to_vec().into())))
362          }
363          Err(err) => Poll::Ready(Some(Err(err))),
364        },
365      }
366    } else {
367      Poll::Ready(None)
368    }
369  }
370}
371
372impl hyper::body::Body for ResourceToBodyAdapter {
373  type Data = Bytes;
374  type Error = JsErrorBox;
375
376  fn poll_frame(
377    self: Pin<&mut Self>,
378    cx: &mut Context<'_>,
379  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
380    match self.poll_next(cx) {
381      Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
382      Poll::Ready(None) => Poll::Ready(None),
383      Poll::Pending => Poll::Pending,
384    }
385  }
386}
387
388impl Drop for ResourceToBodyAdapter {
389  fn drop(&mut self) {
390    self.0.clone().close()
391  }
392}
393
394pub trait FetchPermissions {
395  fn check_net_url(
396    &mut self,
397    url: &Url,
398    api_name: &str,
399  ) -> Result<(), PermissionCheckError>;
400  #[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
401  fn check_read<'a>(
402    &mut self,
403    resolved: bool,
404    p: &'a Path,
405    api_name: &str,
406  ) -> Result<Cow<'a, Path>, FsError>;
407}
408
409impl FetchPermissions for deno_permissions::PermissionsContainer {
410  #[inline(always)]
411  fn check_net_url(
412    &mut self,
413    url: &Url,
414    api_name: &str,
415  ) -> Result<(), PermissionCheckError> {
416    deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
417  }
418
419  #[inline(always)]
420  fn check_read<'a>(
421    &mut self,
422    resolved: bool,
423    path: &'a Path,
424    api_name: &str,
425  ) -> Result<Cow<'a, Path>, FsError> {
426    if resolved {
427      self
428        .check_special_file(path, api_name)
429        .map_err(FsError::NotCapable)?;
430      return Ok(Cow::Borrowed(path));
431    }
432
433    deno_permissions::PermissionsContainer::check_read_path(
434      self,
435      path,
436      Some(api_name),
437    )
438    .map_err(|_| FsError::NotCapable("read"))
439  }
440}
441
442#[op2(stack_trace)]
443#[serde]
444#[allow(clippy::too_many_arguments)]
445pub fn op_fetch<FP>(
446  state: &mut OpState,
447  #[serde] method: ByteString,
448  #[string] url: String,
449  #[serde] headers: Vec<(ByteString, ByteString)>,
450  #[smi] client_rid: Option<u32>,
451  has_body: bool,
452  #[buffer] data: Option<JsBuffer>,
453  #[smi] resource: Option<ResourceId>,
454) -> Result<FetchReturn, FetchError>
455where
456  FP: FetchPermissions + 'static,
457{
458  let (client, allow_host) = if let Some(rid) = client_rid {
459    let r = state.resource_table.get::<HttpClientResource>(rid)?;
460    (r.client.clone(), r.allow_host)
461  } else {
462    (get_or_create_client_from_state(state)?, false)
463  };
464
465  let method = Method::from_bytes(&method)?;
466  let mut url = Url::parse(&url)?;
467
468  // Check scheme before asking for net permission
469  let scheme = url.scheme();
470  let (request_rid, cancel_handle_rid) = match scheme {
471    "file" => {
472      if method != Method::GET {
473        return Err(FetchError::FsNotGet(method));
474      }
475      let Options {
476        file_fetch_handler, ..
477      } = state.borrow_mut::<Options>();
478      let file_fetch_handler = file_fetch_handler.clone();
479      let (future, maybe_cancel_handle) =
480        file_fetch_handler.fetch_file(state, &url);
481      let request_rid = state
482        .resource_table
483        .add(FetchRequestResource { future, url });
484      let maybe_cancel_handle_rid = maybe_cancel_handle
485        .map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
486
487      (request_rid, maybe_cancel_handle_rid)
488    }
489    "http" | "https" => {
490      let permissions = state.borrow_mut::<FP>();
491      permissions.check_net_url(&url, "fetch()")?;
492
493      let maybe_authority = extract_authority(&mut url);
494      let uri = url
495        .as_str()
496        .parse::<Uri>()
497        .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
498
499      let mut con_len = None;
500      let body = if has_body {
501        match (data, resource) {
502          (Some(data), _) => {
503            // If a body is passed, we use it, and don't return a body for streaming.
504            con_len = Some(data.len() as u64);
505
506            ReqBody::full(data.to_vec().into())
507          }
508          (_, Some(resource)) => {
509            let resource = state.resource_table.take_any(resource)?;
510            match resource.size_hint() {
511              (body_size, Some(n)) if body_size == n && body_size > 0 => {
512                con_len = Some(body_size);
513              }
514              _ => {}
515            }
516            ReqBody::streaming(ResourceToBodyAdapter::new(resource))
517          }
518          (None, None) => unreachable!(),
519        }
520      } else {
521        // POST and PUT requests should always have a 0 length content-length,
522        // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
523        if matches!(method, Method::POST | Method::PUT) {
524          con_len = Some(0);
525        }
526        ReqBody::empty()
527      };
528
529      let mut request = http::Request::new(body);
530      *request.method_mut() = method.clone();
531      *request.uri_mut() = uri.clone();
532
533      if let Some((username, password)) = maybe_authority {
534        request.headers_mut().insert(
535          AUTHORIZATION,
536          proxy::basic_auth(&username, password.as_deref()),
537        );
538      }
539      if let Some(len) = con_len {
540        request.headers_mut().insert(CONTENT_LENGTH, len.into());
541      }
542
543      for (key, value) in headers {
544        let name = HeaderName::from_bytes(&key)?;
545        let v = HeaderValue::from_bytes(&value)?;
546
547        if (name != HOST || allow_host) && name != CONTENT_LENGTH {
548          request.headers_mut().append(name, v);
549        }
550      }
551
552      if request.headers().contains_key(RANGE) {
553        // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18
554        // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`)
555        request
556          .headers_mut()
557          .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
558      }
559
560      let options = state.borrow::<Options>();
561      if let Some(request_builder_hook) = options.request_builder_hook {
562        request_builder_hook(&mut request)
563          .map_err(FetchError::RequestBuilderHook)?;
564      }
565
566      let cancel_handle = CancelHandle::new_rc();
567      let cancel_handle_ = cancel_handle.clone();
568
569      let fut = async move {
570        client
571          .send(request)
572          .map_err(Into::into)
573          .or_cancel(cancel_handle_)
574          .await
575      };
576
577      let request_rid = state.resource_table.add(FetchRequestResource {
578        future: Box::pin(fut),
579        url,
580      });
581
582      let cancel_handle_rid =
583        state.resource_table.add(FetchCancelHandle(cancel_handle));
584
585      (request_rid, Some(cancel_handle_rid))
586    }
587    "data" => {
588      let data_url =
589        DataUrl::process(url.as_str()).map_err(FetchError::DataUrl)?;
590
591      let (body, _) = data_url.decode_to_vec().map_err(FetchError::Base64)?;
592      let body = http_body_util::Full::new(body.into())
593        .map_err(|never| match never {})
594        .boxed();
595
596      let response = http::Response::builder()
597        .status(http::StatusCode::OK)
598        .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
599        .body(body)?;
600
601      let fut = async move { Ok(Ok(response)) };
602
603      let request_rid = state.resource_table.add(FetchRequestResource {
604        future: Box::pin(fut),
605        url,
606      });
607
608      (request_rid, None)
609    }
610    "blob" => {
611      // Blob URL resolution happens in the JS side of fetch. If we got here is
612      // because the URL isn't an object URL.
613      return Err(FetchError::BlobNotFound);
614    }
615    _ => return Err(FetchError::SchemeNotSupported(scheme.to_string())),
616  };
617
618  Ok(FetchReturn {
619    request_rid,
620    cancel_handle_rid,
621  })
622}
623
624#[derive(Default, Serialize)]
625#[serde(rename_all = "camelCase")]
626pub struct FetchResponse {
627  pub status: u16,
628  pub status_text: String,
629  pub headers: Vec<(ByteString, ByteString)>,
630  pub url: String,
631  pub response_rid: ResourceId,
632  pub content_length: Option<u64>,
633  pub remote_addr_ip: Option<String>,
634  pub remote_addr_port: Option<u16>,
635  /// This field is populated if some error occurred which needs to be
636  /// reconstructed in the JS side to set the error _cause_.
637  /// In the tuple, the first element is an error message and the second one is
638  /// an error cause.
639  pub error: Option<(String, String)>,
640}
641
642#[op2(async)]
643#[serde]
644pub async fn op_fetch_send(
645  state: Rc<RefCell<OpState>>,
646  #[smi] rid: ResourceId,
647) -> Result<FetchResponse, FetchError> {
648  let request = state
649    .borrow_mut()
650    .resource_table
651    .take::<FetchRequestResource>(rid)?;
652
653  let request = Rc::try_unwrap(request)
654    .ok()
655    .expect("multiple op_fetch_send ongoing");
656
657  let res = match request.future.await {
658    Ok(Ok(res)) => res,
659    Ok(Err(err)) => {
660      // We're going to try and rescue the error cause from a stream and return it from this fetch.
661      // If any error in the chain is a hyper body error, return that as a special result we can use to
662      // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
663      // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
664
665      if let FetchError::ClientSend(err_src) = &err {
666        if let Some(client_err) = std::error::Error::source(&err_src.source) {
667          if let Some(err_src) = client_err.downcast_ref::<hyper::Error>() {
668            if let Some(err_src) = std::error::Error::source(err_src) {
669              return Ok(FetchResponse {
670                error: Some((err.to_string(), err_src.to_string())),
671                ..Default::default()
672              });
673            }
674          }
675        }
676      }
677
678      return Err(err);
679    }
680    Err(_) => return Err(FetchError::RequestCanceled),
681  };
682
683  let status = res.status();
684  let url = request.url.into();
685  let mut res_headers = Vec::new();
686  for (key, val) in res.headers().iter() {
687    res_headers.push((key.as_str().into(), val.as_bytes().into()));
688  }
689
690  let content_length = hyper::body::Body::size_hint(res.body()).exact();
691  let remote_addr = res
692    .extensions()
693    .get::<hyper_util::client::legacy::connect::HttpInfo>()
694    .map(|info| info.remote_addr());
695  let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
696    (Some(addr.ip().to_string()), Some(addr.port()))
697  } else {
698    (None, None)
699  };
700
701  let response_rid = state
702    .borrow_mut()
703    .resource_table
704    .add(FetchResponseResource::new(res, content_length));
705
706  Ok(FetchResponse {
707    status: status.as_u16(),
708    status_text: status.canonical_reason().unwrap_or("").to_string(),
709    headers: res_headers,
710    url,
711    response_rid,
712    content_length,
713    remote_addr_ip,
714    remote_addr_port,
715    error: None,
716  })
717}
718
719type CancelableResponseResult =
720  Result<Result<http::Response<ResBody>, FetchError>, Canceled>;
721
722pub struct FetchRequestResource {
723  pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
724  pub url: Url,
725}
726
727impl Resource for FetchRequestResource {
728  fn name(&self) -> Cow<str> {
729    "fetchRequest".into()
730  }
731}
732
733pub struct FetchCancelHandle(pub Rc<CancelHandle>);
734
735impl Resource for FetchCancelHandle {
736  fn name(&self) -> Cow<str> {
737    "fetchCancelHandle".into()
738  }
739
740  fn close(self: Rc<Self>) {
741    self.0.cancel()
742  }
743}
744
745type BytesStream =
746  Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
747
748pub enum FetchResponseReader {
749  Start(http::Response<ResBody>),
750  BodyReader(Peekable<BytesStream>),
751}
752
753impl Default for FetchResponseReader {
754  fn default() -> Self {
755    let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
756    Self::BodyReader(stream.peekable())
757  }
758}
759#[derive(Debug)]
760pub struct FetchResponseResource {
761  pub response_reader: AsyncRefCell<FetchResponseReader>,
762  pub cancel: CancelHandle,
763  pub size: Option<u64>,
764}
765
766impl FetchResponseResource {
767  pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
768    Self {
769      response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
770      cancel: CancelHandle::default(),
771      size,
772    }
773  }
774
775  pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
776    let reader = self.response_reader.into_inner();
777    match reader {
778      FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
779      _ => unreachable!(),
780    }
781  }
782}
783
784impl Resource for FetchResponseResource {
785  fn name(&self) -> Cow<str> {
786    "fetchResponse".into()
787  }
788
789  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
790    Box::pin(async move {
791      let mut reader =
792        RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
793
794      let body = loop {
795        match &mut *reader {
796          FetchResponseReader::BodyReader(reader) => break reader,
797          FetchResponseReader::Start(_) => {}
798        }
799
800        match std::mem::take(&mut *reader) {
801          FetchResponseReader::Start(resp) => {
802            let stream: BytesStream =
803              Box::pin(resp.into_body().into_data_stream().map(|r| {
804                r.map_err(|err| {
805                  std::io::Error::new(std::io::ErrorKind::Other, err)
806                })
807              }));
808            *reader = FetchResponseReader::BodyReader(stream.peekable());
809          }
810          FetchResponseReader::BodyReader(_) => unreachable!(),
811        }
812      };
813      let fut = async move {
814        let mut reader = Pin::new(body);
815        loop {
816          match reader.as_mut().peek_mut().await {
817            Some(Ok(chunk)) if !chunk.is_empty() => {
818              let len = min(limit, chunk.len());
819              let chunk = chunk.split_to(len);
820              break Ok(chunk.into());
821            }
822            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
823            // currently has a peeked value that can be synchronously returned
824            // from `next()`.
825            //
826            // The future returned from `next()` is always ready, so we can
827            // safely call `await` on it without creating a race condition.
828            Some(_) => match reader.as_mut().next().await.unwrap() {
829              Ok(chunk) => assert!(chunk.is_empty()),
830              Err(err) => break Err(JsErrorBox::type_error(err.to_string())),
831            },
832            None => break Ok(BufView::empty()),
833          }
834        }
835      };
836
837      let cancel_handle = RcRef::map(self, |r| &r.cancel);
838      fut
839        .try_or_cancel(cancel_handle)
840        .await
841        .map_err(JsErrorBox::from_err)
842    })
843  }
844
845  fn size_hint(&self) -> (u64, Option<u64>) {
846    (self.size.unwrap_or(0), self.size)
847  }
848
849  fn close(self: Rc<Self>) {
850    self.cancel.cancel()
851  }
852}
853
854pub struct HttpClientResource {
855  pub client: Client,
856  pub allow_host: bool,
857}
858
859impl Resource for HttpClientResource {
860  fn name(&self) -> Cow<str> {
861    "httpClient".into()
862  }
863}
864
865impl HttpClientResource {
866  fn new(client: Client, allow_host: bool) -> Self {
867    Self { client, allow_host }
868  }
869}
870
871#[derive(Deserialize, Debug)]
872#[serde(rename_all = "camelCase")]
873pub struct CreateHttpClientArgs {
874  ca_certs: Vec<String>,
875  proxy: Option<Proxy>,
876  pool_max_idle_per_host: Option<usize>,
877  pool_idle_timeout: Option<serde_json::Value>,
878  #[serde(default)]
879  use_hickory_resolver: bool,
880  #[serde(default = "default_true")]
881  http1: bool,
882  #[serde(default = "default_true")]
883  http2: bool,
884  #[serde(default)]
885  allow_host: bool,
886}
887
888fn default_true() -> bool {
889  true
890}
891
892#[op2(stack_trace)]
893#[smi]
894pub fn op_fetch_custom_client<FP>(
895  state: &mut OpState,
896  #[serde] args: CreateHttpClientArgs,
897  #[cppgc] tls_keys: &TlsKeysHolder,
898) -> Result<ResourceId, FetchError>
899where
900  FP: FetchPermissions + 'static,
901{
902  if let Some(proxy) = args.proxy.clone() {
903    let permissions = state.borrow_mut::<FP>();
904    let url = Url::parse(&proxy.url)?;
905    permissions.check_net_url(&url, "Deno.createHttpClient()")?;
906  }
907
908  let options = state.borrow::<Options>();
909  let ca_certs = args
910    .ca_certs
911    .into_iter()
912    .map(|cert| cert.into_bytes())
913    .collect::<Vec<_>>();
914
915  let client = create_http_client(
916    &options.user_agent,
917    CreateHttpClientOptions {
918      root_cert_store: options
919        .root_cert_store()
920        .map_err(HttpClientCreateError::RootCertStore)?,
921      ca_certs,
922      proxy: args.proxy,
923      dns_resolver: if args.use_hickory_resolver {
924        dns::Resolver::hickory().map_err(FetchError::Dns)?
925      } else {
926        dns::Resolver::default()
927      },
928      unsafely_ignore_certificate_errors: options
929        .unsafely_ignore_certificate_errors
930        .clone(),
931      client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
932      pool_max_idle_per_host: args.pool_max_idle_per_host,
933      pool_idle_timeout: args.pool_idle_timeout.and_then(
934        |timeout| match timeout {
935          serde_json::Value::Bool(true) => None,
936          serde_json::Value::Bool(false) => Some(None),
937          serde_json::Value::Number(specify) => {
938            Some(Some(specify.as_u64().unwrap_or_default()))
939          }
940          _ => Some(None),
941        },
942      ),
943      http1: args.http1,
944      http2: args.http2,
945      client_builder_hook: options.client_builder_hook,
946    },
947  )?;
948
949  let rid = state
950    .resource_table
951    .add(HttpClientResource::new(client, args.allow_host));
952  Ok(rid)
953}
954
955#[derive(Debug, Clone)]
956pub struct CreateHttpClientOptions {
957  pub root_cert_store: Option<RootCertStore>,
958  pub ca_certs: Vec<Vec<u8>>,
959  pub proxy: Option<Proxy>,
960  pub dns_resolver: dns::Resolver,
961  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
962  pub client_cert_chain_and_key: Option<TlsKey>,
963  pub pool_max_idle_per_host: Option<usize>,
964  pub pool_idle_timeout: Option<Option<u64>>,
965  pub http1: bool,
966  pub http2: bool,
967  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
968}
969
970impl Default for CreateHttpClientOptions {
971  fn default() -> Self {
972    CreateHttpClientOptions {
973      root_cert_store: None,
974      ca_certs: vec![],
975      proxy: None,
976      dns_resolver: dns::Resolver::default(),
977      unsafely_ignore_certificate_errors: None,
978      client_cert_chain_and_key: None,
979      pool_max_idle_per_host: None,
980      pool_idle_timeout: None,
981      http1: true,
982      http2: true,
983      client_builder_hook: None,
984    }
985  }
986}
987
988#[derive(Debug, thiserror::Error, deno_error::JsError)]
989#[class(type)]
990pub enum HttpClientCreateError {
991  #[error(transparent)]
992  Tls(deno_tls::TlsError),
993  #[error("Illegal characters in User-Agent: received {0}")]
994  InvalidUserAgent(String),
995  #[error("invalid proxy url")]
996  InvalidProxyUrl,
997  #[error("Cannot create Http Client: either `http1` or `http2` needs to be set to true")]
998  HttpVersionSelectionInvalid,
999  #[class(inherit)]
1000  #[error(transparent)]
1001  RootCertStore(JsErrorBox),
1002}
1003
1004/// Create new instance of async Client. This client supports
1005/// proxies and doesn't follow redirects.
1006pub fn create_http_client(
1007  user_agent: &str,
1008  options: CreateHttpClientOptions,
1009) -> Result<Client, HttpClientCreateError> {
1010  let mut tls_config = deno_tls::create_client_config(
1011    options.root_cert_store,
1012    options.ca_certs,
1013    options.unsafely_ignore_certificate_errors,
1014    options.client_cert_chain_and_key.into(),
1015    deno_tls::SocketUse::Http,
1016  )
1017  .map_err(HttpClientCreateError::Tls)?;
1018
1019  // Proxy TLS should not send ALPN
1020  tls_config.alpn_protocols.clear();
1021  let proxy_tls_config = Arc::from(tls_config.clone());
1022
1023  let mut alpn_protocols = vec![];
1024  if options.http2 {
1025    alpn_protocols.push("h2".into());
1026  }
1027  if options.http1 {
1028    alpn_protocols.push("http/1.1".into());
1029  }
1030  tls_config.alpn_protocols = alpn_protocols;
1031  let tls_config = Arc::from(tls_config);
1032
1033  let mut http_connector =
1034    HttpConnector::new_with_resolver(options.dns_resolver.clone());
1035  http_connector.enforce_http(false);
1036
1037  let user_agent = user_agent.parse::<HeaderValue>().map_err(|_| {
1038    HttpClientCreateError::InvalidUserAgent(user_agent.to_string())
1039  })?;
1040
1041  let mut builder = HyperClientBuilder::new(TokioExecutor::new());
1042  builder.timer(TokioTimer::new());
1043  builder.pool_timer(TokioTimer::new());
1044
1045  if let Some(client_builder_hook) = options.client_builder_hook {
1046    builder = client_builder_hook(builder);
1047  }
1048
1049  let mut proxies = proxy::from_env();
1050  if let Some(proxy) = options.proxy {
1051    let mut intercept = proxy::Intercept::all(&proxy.url)
1052      .ok_or_else(|| HttpClientCreateError::InvalidProxyUrl)?;
1053    if let Some(basic_auth) = &proxy.basic_auth {
1054      intercept.set_auth(&basic_auth.username, &basic_auth.password);
1055    }
1056    proxies.prepend(intercept);
1057  }
1058  let proxies = Arc::new(proxies);
1059  let connector = proxy::ProxyConnector {
1060    http: http_connector,
1061    proxies: proxies.clone(),
1062    tls: tls_config,
1063    tls_proxy: proxy_tls_config,
1064    user_agent: Some(user_agent.clone()),
1065  };
1066
1067  if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
1068    builder.pool_max_idle_per_host(pool_max_idle_per_host);
1069  }
1070
1071  if let Some(pool_idle_timeout) = options.pool_idle_timeout {
1072    builder.pool_idle_timeout(
1073      pool_idle_timeout.map(std::time::Duration::from_millis),
1074    );
1075  }
1076
1077  match (options.http1, options.http2) {
1078    (true, false) => {} // noop, handled by ALPN above
1079    (false, true) => {
1080      builder.http2_only(true);
1081    }
1082    (true, true) => {}
1083    (false, false) => {
1084      return Err(HttpClientCreateError::HttpVersionSelectionInvalid)
1085    }
1086  }
1087
1088  let pooled_client = builder.build(connector);
1089  let retry_client = retry::Retry::new(FetchRetry, pooled_client);
1090  let decompress = Decompression::new(retry_client).gzip(true).br(true);
1091
1092  Ok(Client {
1093    inner: decompress,
1094    proxies,
1095    user_agent,
1096  })
1097}
1098
1099#[op2]
1100#[serde]
1101pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
1102  input.into()
1103}
1104
1105#[derive(Clone, Debug)]
1106pub struct Client {
1107  inner: Decompression<
1108    retry::Retry<
1109      FetchRetry,
1110      hyper_util::client::legacy::Client<Connector, ReqBody>,
1111    >,
1112  >,
1113  // Used to check whether to include a proxy-authorization header
1114  proxies: Arc<proxy::Proxies>,
1115  user_agent: HeaderValue,
1116}
1117
1118type Connector = proxy::ProxyConnector<HttpConnector<dns::Resolver>>;
1119
1120// clippy is wrong here
1121#[allow(clippy::declare_interior_mutable_const)]
1122const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
1123
1124#[derive(Debug, deno_error::JsError)]
1125#[class(type)]
1126pub struct ClientSendError {
1127  uri: Uri,
1128  pub source: hyper_util::client::legacy::Error,
1129}
1130
1131impl ClientSendError {
1132  pub fn is_connect_error(&self) -> bool {
1133    self.source.is_connect()
1134  }
1135
1136  fn http_info(&self) -> Option<HttpInfo> {
1137    let mut exts = Extensions::new();
1138    self.source.connect_info()?.get_extras(&mut exts);
1139    exts.remove::<HttpInfo>()
1140  }
1141}
1142
1143impl std::fmt::Display for ClientSendError {
1144  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1145    // NOTE: we can use `std::error::Report` instead once it's stabilized.
1146    let detail = error_reporter::Report::new(&self.source);
1147
1148    match self.http_info() {
1149      Some(http_info) => {
1150        write!(
1151          f,
1152          "error sending request from {src} for {uri} ({dst}): {detail}",
1153          src = http_info.local_addr(),
1154          uri = self.uri,
1155          dst = http_info.remote_addr(),
1156          detail = detail,
1157        )
1158      }
1159      None => {
1160        write!(
1161          f,
1162          "error sending request for url ({uri}): {detail}",
1163          uri = self.uri,
1164          detail = detail,
1165        )
1166      }
1167    }
1168  }
1169}
1170
1171impl std::error::Error for ClientSendError {
1172  fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1173    Some(&self.source)
1174  }
1175}
1176
1177impl Client {
1178  pub async fn send(
1179    self,
1180    mut req: http::Request<ReqBody>,
1181  ) -> Result<http::Response<ResBody>, ClientSendError> {
1182    req
1183      .headers_mut()
1184      .entry(USER_AGENT)
1185      .or_insert_with(|| self.user_agent.clone());
1186
1187    req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1188
1189    if let Some(auth) = self.proxies.http_forward_auth(req.uri()) {
1190      req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
1191    }
1192
1193    let uri = req.uri().clone();
1194
1195    let resp = self
1196      .inner
1197      .oneshot(req)
1198      .await
1199      .map_err(|e| ClientSendError { uri, source: e })?;
1200    Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1201  }
1202}
1203
1204// This is a custom enum to allow the retry policy to clone the variants that could be retried.
1205pub enum ReqBody {
1206  Full(http_body_util::Full<Bytes>),
1207  Empty(http_body_util::Empty<Bytes>),
1208  Streaming(BoxBody<Bytes, JsErrorBox>),
1209}
1210
1211pub type ResBody = BoxBody<Bytes, JsErrorBox>;
1212
1213impl ReqBody {
1214  pub fn full(bytes: Bytes) -> Self {
1215    ReqBody::Full(http_body_util::Full::new(bytes))
1216  }
1217
1218  pub fn empty() -> Self {
1219    ReqBody::Empty(http_body_util::Empty::new())
1220  }
1221
1222  pub fn streaming<B>(body: B) -> Self
1223  where
1224    B: hyper::body::Body<Data = Bytes, Error = JsErrorBox>
1225      + Send
1226      + Sync
1227      + 'static,
1228  {
1229    ReqBody::Streaming(BoxBody::new(body))
1230  }
1231}
1232
1233impl hyper::body::Body for ReqBody {
1234  type Data = Bytes;
1235  type Error = JsErrorBox;
1236
1237  fn poll_frame(
1238    mut self: Pin<&mut Self>,
1239    cx: &mut Context<'_>,
1240  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1241    match &mut *self {
1242      ReqBody::Full(ref mut b) => {
1243        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1244      }
1245      ReqBody::Empty(ref mut b) => {
1246        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1247      }
1248      ReqBody::Streaming(ref mut b) => Pin::new(b).poll_frame(cx),
1249    }
1250  }
1251
1252  fn is_end_stream(&self) -> bool {
1253    match self {
1254      ReqBody::Full(ref b) => b.is_end_stream(),
1255      ReqBody::Empty(ref b) => b.is_end_stream(),
1256      ReqBody::Streaming(ref b) => b.is_end_stream(),
1257    }
1258  }
1259
1260  fn size_hint(&self) -> hyper::body::SizeHint {
1261    match self {
1262      ReqBody::Full(ref b) => b.size_hint(),
1263      ReqBody::Empty(ref b) => b.size_hint(),
1264      ReqBody::Streaming(ref b) => b.size_hint(),
1265    }
1266  }
1267}
1268
1269/// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572
1270/// Check the request URL for a "username:password" type authority, and if
1271/// found, remove it from the URL and return it.
1272pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
1273  use percent_encoding::percent_decode;
1274
1275  if url.has_authority() {
1276    let username: String = percent_decode(url.username().as_bytes())
1277      .decode_utf8()
1278      .ok()?
1279      .into();
1280    let password = url.password().and_then(|pass| {
1281      percent_decode(pass.as_bytes())
1282        .decode_utf8()
1283        .ok()
1284        .map(String::from)
1285    });
1286    if !username.is_empty() || password.is_some() {
1287      url
1288        .set_username("")
1289        .expect("has_authority means set_username shouldn't fail");
1290      url
1291        .set_password(None)
1292        .expect("has_authority means set_password shouldn't fail");
1293      return Some((username, password));
1294    }
1295  }
1296
1297  None
1298}
1299
1300#[op2(fast)]
1301fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
1302  promise.state() != v8::PromiseState::Pending
1303}
1304
1305/// Deno.fetch's retry policy.
1306#[derive(Clone, Debug)]
1307struct FetchRetry;
1308
1309/// Marker extension that a request has been retried once.
1310#[derive(Clone, Debug)]
1311struct Retried;
1312
1313impl<ResBody, E>
1314  retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
1315  for FetchRetry
1316where
1317  E: std::error::Error + 'static,
1318{
1319  /// Don't delay retries.
1320  type Future = future::Ready<()>;
1321
1322  fn retry(
1323    &mut self,
1324    req: &mut http::Request<ReqBody>,
1325    result: &mut Result<http::Response<ResBody>, E>,
1326  ) -> Option<Self::Future> {
1327    if req.extensions().get::<Retried>().is_some() {
1328      // only retry once
1329      return None;
1330    }
1331
1332    match result {
1333      Ok(..) => {
1334        // never retry a Response
1335        None
1336      }
1337      Err(err) => {
1338        if is_error_retryable(&*err) {
1339          req.extensions_mut().insert(Retried);
1340          Some(future::ready(()))
1341        } else {
1342          None
1343        }
1344      }
1345    }
1346  }
1347
1348  fn clone_request(
1349    &mut self,
1350    req: &http::Request<ReqBody>,
1351  ) -> Option<http::Request<ReqBody>> {
1352    let body = match req.body() {
1353      ReqBody::Full(b) => ReqBody::Full(b.clone()),
1354      ReqBody::Empty(b) => ReqBody::Empty(*b),
1355      ReqBody::Streaming(..) => return None,
1356    };
1357
1358    let mut clone = http::Request::new(body);
1359    *clone.method_mut() = req.method().clone();
1360    *clone.uri_mut() = req.uri().clone();
1361    *clone.headers_mut() = req.headers().clone();
1362    *clone.extensions_mut() = req.extensions().clone();
1363    Some(clone)
1364  }
1365}
1366
1367fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
1368  // Note: hyper doesn't promise it will always be this h2 version. Keep up to date.
1369  if let Some(err) = find_source::<h2::Error>(err) {
1370    // They sent us a graceful shutdown, try with a new connection!
1371    if err.is_go_away()
1372      && err.is_remote()
1373      && err.reason() == Some(h2::Reason::NO_ERROR)
1374    {
1375      return true;
1376    }
1377
1378    // REFUSED_STREAM was sent from the server, which is safe to retry.
1379    // https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
1380    if err.is_reset()
1381      && err.is_remote()
1382      && err.reason() == Some(h2::Reason::REFUSED_STREAM)
1383    {
1384      return true;
1385    }
1386  }
1387
1388  false
1389}
1390
1391fn find_source<'a, E: std::error::Error + 'static>(
1392  err: &'a (dyn std::error::Error + 'static),
1393) -> Option<&'a E> {
1394  let mut err = Some(err);
1395  while let Some(src) = err {
1396    if let Some(found) = src.downcast_ref::<E>() {
1397      return Some(found);
1398    }
1399    err = src.source();
1400  }
1401  None
1402}