Skip to main content

deno_fetch/
lib.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3pub mod dns;
4mod fs_fetch_handler;
5mod proxy;
6#[cfg(test)]
7mod tests;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::cmp::min;
12use std::convert::From;
13use std::future;
14use std::future::Future;
15use std::net::IpAddr;
16use std::path::Path;
17#[cfg(not(windows))]
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24
25use bytes::Bytes;
26// Re-export data_url
27pub use data_url;
28use data_url::DataUrl;
29use deno_core::AsyncRefCell;
30use deno_core::AsyncResult;
31use deno_core::BufView;
32use deno_core::CancelFuture;
33use deno_core::CancelHandle;
34use deno_core::CancelTryFuture;
35use deno_core::Canceled;
36use deno_core::FromV8;
37use deno_core::OpState;
38use deno_core::RcRef;
39use deno_core::Resource;
40use deno_core::ResourceId;
41use deno_core::ToV8;
42use deno_core::convert::ByteString;
43use deno_core::convert::Uint8Array;
44use deno_core::futures::FutureExt;
45use deno_core::futures::Stream;
46use deno_core::futures::StreamExt;
47use deno_core::futures::TryFutureExt;
48use deno_core::futures::stream::Peekable;
49use deno_core::op2;
50use deno_core::url;
51use deno_core::url::Url;
52use deno_core::v8;
53use deno_error::JsErrorBox;
54pub use deno_fs::FsError;
55use deno_path_util::PathToUrlError;
56use deno_permissions::OpenAccessKind;
57use deno_permissions::PermissionCheckError;
58use deno_permissions::PermissionsContainer;
59use deno_tls::Proxy;
60use deno_tls::RootCertStoreProvider;
61use deno_tls::SocketUse;
62use deno_tls::TlsKey;
63use deno_tls::TlsKeys;
64use deno_tls::TlsKeysHolder;
65use deno_tls::rustls::RootCertStore;
66pub use fs_fetch_handler::FsFetchHandler;
67use http::Extensions;
68use http::HeaderMap;
69use http::Method;
70use http::Uri;
71use http::header::ACCEPT;
72use http::header::ACCEPT_ENCODING;
73use http::header::AUTHORIZATION;
74use http::header::CONTENT_LENGTH;
75use http::header::HOST;
76use http::header::HeaderName;
77use http::header::HeaderValue;
78use http::header::PROXY_AUTHORIZATION;
79use http::header::RANGE;
80use http::header::USER_AGENT;
81use http_body_util::BodyExt;
82use http_body_util::combinators::BoxBody;
83use hyper::body::Frame;
84use hyper_util::client::legacy::Builder as HyperClientBuilder;
85use hyper_util::client::legacy::connect::Connection;
86use hyper_util::client::legacy::connect::HttpConnector;
87use hyper_util::client::legacy::connect::HttpInfo;
88use hyper_util::rt::TokioExecutor;
89use hyper_util::rt::TokioIo;
90use hyper_util::rt::TokioTimer;
91pub use proxy::basic_auth;
92use tower::BoxError;
93use tower::Service;
94use tower::ServiceExt;
95use tower::retry;
96use tower_http::decompression::Decompression;
97
98#[derive(Clone)]
99pub struct Options {
100  pub user_agent: String,
101  pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
102  pub proxy: Option<Proxy>,
103  /// A callback to customize HTTP client configuration.
104  ///
105  /// The settings applied with this hook may be overridden by the options
106  /// provided through `Deno.createHttpClient()` API. For instance, if the hook
107  /// calls [`hyper_util::client::legacy::Builder::pool_max_idle_per_host`] with
108  /// a value of 99, and a user calls `Deno.createHttpClient({ poolMaxIdlePerHost: 42 })`,
109  /// the value that will take effect is 42.
110  ///
111  /// For more info on what can be configured, see [`hyper_util::client::legacy::Builder`].
112  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
113  #[allow(clippy::type_complexity, reason = "TODO: improve")]
114  pub request_builder_hook:
115    Option<fn(&mut http::Request<ReqBody>) -> Result<(), JsErrorBox>>,
116  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
117  pub client_cert_chain_and_key: TlsKeys,
118  pub file_fetch_handler: Rc<dyn FetchHandler>,
119  pub resolver: dns::Resolver,
120}
121
122impl Options {
123  pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
124    Ok(match &self.root_cert_store_provider {
125      Some(provider) => Some(provider.get_or_try_init()?.clone()),
126      None => None,
127    })
128  }
129}
130
131impl Default for Options {
132  fn default() -> Self {
133    Self {
134      user_agent: "".to_string(),
135      root_cert_store_provider: None,
136      proxy: None,
137      client_builder_hook: None,
138      request_builder_hook: None,
139      unsafely_ignore_certificate_errors: None,
140      client_cert_chain_and_key: TlsKeys::Null,
141      file_fetch_handler: Rc::new(DefaultFileFetchHandler),
142      resolver: dns::Resolver::default(),
143    }
144  }
145}
146
147deno_core::extension!(deno_fetch,
148  deps = [ deno_webidl, deno_web ],
149  ops = [
150    op_fetch,
151    op_fetch_send,
152    op_utf8_to_byte_string,
153    op_fetch_custom_client,
154    op_fetch_promise_is_settled,
155  ],
156  esm = [
157    "20_headers.js",
158    "21_formdata.js",
159    "22_body.js",
160    "22_http_client.js",
161    "23_request.js",
162    "23_response.js",
163    "26_fetch.js",
164    "27_eventsource.js"
165  ],
166  options = {
167    options: Options,
168  },
169  state = |state, options| {
170    state.put::<Options>(options.options);
171  },
172);
173
174#[derive(Debug, thiserror::Error, deno_error::JsError)]
175pub enum FetchError {
176  #[class(inherit)]
177  #[error(transparent)]
178  Resource(#[from] deno_core::error::ResourceError),
179  #[class(inherit)]
180  #[error(transparent)]
181  Permission(#[from] PermissionCheckError),
182  #[class(type)]
183  #[error("NetworkError when attempting to fetch resource")]
184  NetworkError,
185  #[class(type)]
186  #[error("Fetching files only supports the GET method: received {0}")]
187  FsNotGet(Method),
188  #[class(inherit)]
189  #[error(transparent)]
190  PathToUrl(#[from] PathToUrlError),
191  #[class(type)]
192  #[error("Invalid URL {0}")]
193  InvalidUrl(Url),
194  #[class(type)]
195  #[error(transparent)]
196  InvalidHeaderName(#[from] http::header::InvalidHeaderName),
197  #[class(type)]
198  #[error(transparent)]
199  InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
200  #[class(type)]
201  #[error("{0:?}")]
202  DataUrl(data_url::DataUrlError),
203  #[class(type)]
204  #[error("{0:?}")]
205  Base64(data_url::forgiving_base64::InvalidBase64),
206  #[class(type)]
207  #[error("Blob for the given URL not found.")]
208  BlobNotFound,
209  #[class(type)]
210  #[error("Url scheme '{0}' not supported")]
211  SchemeNotSupported(String),
212  #[class(type)]
213  #[error("Request was cancelled")]
214  RequestCanceled,
215  #[class(generic)]
216  #[error(transparent)]
217  Http(#[from] http::Error),
218  #[class(inherit)]
219  #[error(transparent)]
220  ClientCreate(#[from] HttpClientCreateError),
221  #[class(inherit)]
222  #[error(transparent)]
223  Url(#[from] url::ParseError),
224  #[class(inherit)]
225  #[error(transparent)]
226  UrlToFilePath(#[from] deno_path_util::UrlToFilePathError),
227  #[class(type)]
228  #[error(transparent)]
229  Method(#[from] http::method::InvalidMethod),
230  #[class(inherit)]
231  #[error(transparent)]
232  ClientSend(#[from] ClientSendError),
233  #[class(inherit)]
234  #[error(transparent)]
235  RequestBuilderHook(JsErrorBox),
236  #[class(inherit)]
237  #[error(transparent)]
238  Io(#[from] std::io::Error),
239  #[class(generic)]
240  #[error(transparent)]
241  Dns(hickory_resolver::ResolveError),
242  #[class(generic)]
243  #[error(transparent)]
244  PermissionCheck(PermissionCheckError),
245  #[class(inherit)]
246  #[error(transparent)]
247  Other(JsErrorBox),
248}
249
250impl From<deno_fs::FsError> for FetchError {
251  fn from(value: deno_fs::FsError) -> Self {
252    match value {
253      deno_fs::FsError::Io(_)
254      | deno_fs::FsError::FileBusy
255      | deno_fs::FsError::NotSupported => FetchError::NetworkError,
256      deno_fs::FsError::PermissionCheck(err) => {
257        FetchError::PermissionCheck(err)
258      }
259      deno_fs::FsError::JoinError(err) => {
260        FetchError::Other(JsErrorBox::from_err(err))
261      }
262    }
263  }
264}
265
266pub type CancelableResponseFuture =
267  Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
268
269pub trait FetchHandler: dyn_clone::DynClone {
270  // Return the result of the fetch request consisting of a tuple of the
271  // cancelable response result, the optional fetch body resource and the
272  // optional cancel handle.
273  fn fetch_file(
274    &self,
275    state: &mut OpState,
276    url: &Url,
277  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
278}
279
280dyn_clone::clone_trait_object!(FetchHandler);
281
282/// A default implementation which will error for every request.
283#[derive(Clone)]
284pub struct DefaultFileFetchHandler;
285
286impl FetchHandler for DefaultFileFetchHandler {
287  fn fetch_file(
288    &self,
289    _state: &mut OpState,
290    _url: &Url,
291  ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
292    let fut = async move { Ok(Err(FetchError::NetworkError)) };
293    (Box::pin(fut), None)
294  }
295}
296
297#[derive(ToV8)]
298pub struct FetchReturn {
299  pub request_rid: ResourceId,
300  pub cancel_handle_rid: Option<ResourceId>,
301}
302
303pub fn get_or_create_client_from_state(
304  state: &mut OpState,
305) -> Result<Client, HttpClientCreateError> {
306  if let Some(client) = state.try_borrow::<Client>() {
307    Ok(client.clone())
308  } else {
309    let options = state.borrow::<Options>();
310    let client = create_client_from_options(options)?;
311    state.put::<Client>(client.clone());
312    Ok(client)
313  }
314}
315
316pub fn create_client_from_options(
317  options: &Options,
318) -> Result<Client, HttpClientCreateError> {
319  create_http_client(
320    &options.user_agent,
321    CreateHttpClientOptions {
322      root_cert_store: options
323        .root_cert_store()
324        .map_err(HttpClientCreateError::RootCertStore)?,
325      ca_certs: vec![],
326      proxy: options.proxy.clone(),
327      dns_resolver: options.resolver.clone(),
328      unsafely_ignore_certificate_errors: options
329        .unsafely_ignore_certificate_errors
330        .clone(),
331      client_cert_chain_and_key: options
332        .client_cert_chain_and_key
333        .clone()
334        .try_into()
335        .unwrap_or_default(),
336      pool_max_idle_per_host: None,
337      pool_idle_timeout: None,
338      http1: true,
339      http2: true,
340      local_address: None,
341      client_builder_hook: options.client_builder_hook,
342    },
343  )
344}
345
346#[allow(clippy::type_complexity, reason = "TODO: improve")]
347pub struct ResourceToBodyAdapter(
348  Rc<dyn Resource>,
349  Option<Pin<Box<dyn Future<Output = Result<BufView, JsErrorBox>>>>>,
350);
351
352impl ResourceToBodyAdapter {
353  pub fn new(resource: Rc<dyn Resource>) -> Self {
354    let future = resource.clone().read(64 * 1024);
355    Self(resource, Some(future))
356  }
357}
358
359// SAFETY: we only use this on a single-threaded executor
360unsafe impl Send for ResourceToBodyAdapter {}
361// SAFETY: we only use this on a single-threaded executor
362unsafe impl Sync for ResourceToBodyAdapter {}
363
364impl Stream for ResourceToBodyAdapter {
365  type Item = Result<Bytes, JsErrorBox>;
366
367  fn poll_next(
368    self: Pin<&mut Self>,
369    cx: &mut Context<'_>,
370  ) -> Poll<Option<Self::Item>> {
371    let this = self.get_mut();
372    match this.1.take() {
373      Some(mut fut) => match fut.poll_unpin(cx) {
374        Poll::Pending => {
375          this.1 = Some(fut);
376          Poll::Pending
377        }
378        Poll::Ready(res) => match res {
379          Ok(buf) if buf.is_empty() => Poll::Ready(None),
380          Ok(buf) => {
381            this.1 = Some(this.0.clone().read(64 * 1024));
382            Poll::Ready(Some(Ok(buf.to_vec().into())))
383          }
384          Err(err) => Poll::Ready(Some(Err(err))),
385        },
386      },
387      _ => Poll::Ready(None),
388    }
389  }
390}
391
392impl hyper::body::Body for ResourceToBodyAdapter {
393  type Data = Bytes;
394  type Error = JsErrorBox;
395
396  fn poll_frame(
397    self: Pin<&mut Self>,
398    cx: &mut Context<'_>,
399  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
400    match self.poll_next(cx) {
401      Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
402      Poll::Ready(None) => Poll::Ready(None),
403      Poll::Pending => Poll::Pending,
404    }
405  }
406}
407
408impl Drop for ResourceToBodyAdapter {
409  fn drop(&mut self) {
410    self.0.clone().close()
411  }
412}
413
414#[op2(stack_trace)]
415#[allow(clippy::too_many_arguments, reason = "op")]
416#[allow(clippy::large_enum_variant, reason = "TODO: investigate")]
417#[allow(clippy::result_large_err, reason = "TODO: investigate")]
418pub fn op_fetch(
419  state: &mut OpState,
420  #[scoped] method: ByteString,
421  #[string] url: String,
422  #[scoped] headers: Vec<(ByteString, ByteString)>,
423  #[smi] client_rid: Option<u32>,
424  has_body: bool,
425  data: Option<Uint8Array>,
426  #[smi] resource: Option<ResourceId>,
427) -> Result<FetchReturn, FetchError> {
428  let (client, allow_host) = if let Some(rid) = client_rid {
429    let r = state.resource_table.get::<HttpClientResource>(rid)?;
430    (r.client.clone(), r.allow_host)
431  } else {
432    (get_or_create_client_from_state(state)?, false)
433  };
434
435  let method = Method::from_bytes(&method)?;
436  let mut url = Url::parse(&url)?;
437
438  // Check scheme before asking for net permission
439  let scheme = url.scheme();
440  let (request_rid, cancel_handle_rid) = match scheme {
441    "file" => {
442      if method != Method::GET {
443        return Err(FetchError::FsNotGet(method));
444      }
445      let Options {
446        file_fetch_handler, ..
447      } = state.borrow_mut::<Options>();
448      let file_fetch_handler = file_fetch_handler.clone();
449      let (future, maybe_cancel_handle) =
450        file_fetch_handler.fetch_file(state, &url);
451      let request_rid = state
452        .resource_table
453        .add(FetchRequestResource { future, url });
454      let maybe_cancel_handle_rid = maybe_cancel_handle
455        .map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
456
457      (request_rid, maybe_cancel_handle_rid)
458    }
459    "http" | "https" => {
460      let permissions = state.borrow_mut::<PermissionsContainer>();
461      permissions.check_net_url(&url, "fetch()")?;
462
463      let maybe_authority = extract_authority(&mut url);
464      let uri = url
465        .as_str()
466        .parse::<Uri>()
467        .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
468
469      let mut con_len = None;
470      let body = if has_body {
471        match (data, resource) {
472          (Some(data), _) => {
473            // If a body is passed, we use it, and don't return a body for streaming.
474            con_len = Some(data.len() as u64);
475
476            ReqBody::full(data.0.into())
477          }
478          (_, Some(resource)) => {
479            let resource = state.resource_table.take_any(resource)?;
480            match resource.size_hint() {
481              (body_size, Some(n)) if body_size == n && body_size > 0 => {
482                con_len = Some(body_size);
483              }
484              _ => {}
485            }
486            ReqBody::streaming(ResourceToBodyAdapter::new(resource))
487          }
488          (None, None) => unreachable!(),
489        }
490      } else {
491        // POST and PUT requests should always have a 0 length content-length,
492        // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
493        if matches!(method, Method::POST | Method::PUT) {
494          con_len = Some(0);
495        }
496        ReqBody::empty()
497      };
498
499      let mut request = http::Request::new(body);
500      *request.method_mut() = method.clone();
501      *request.uri_mut() = uri.clone();
502
503      if let Some((username, password)) = maybe_authority {
504        request.headers_mut().insert(
505          AUTHORIZATION,
506          proxy::basic_auth(&username, password.as_deref()),
507        );
508      }
509      if let Some(len) = con_len {
510        request.headers_mut().insert(CONTENT_LENGTH, len.into());
511      }
512
513      for (key, value) in headers {
514        let name = HeaderName::from_bytes(&key)?;
515        let v = HeaderValue::from_bytes(&value)?;
516
517        if (name != HOST || allow_host) && name != CONTENT_LENGTH {
518          request.headers_mut().append(name, v);
519        }
520      }
521
522      if request.headers().contains_key(RANGE) {
523        // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18
524        // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`)
525        request
526          .headers_mut()
527          .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
528      }
529
530      let options = state.borrow::<Options>();
531      if let Some(request_builder_hook) = options.request_builder_hook {
532        request_builder_hook(&mut request)
533          .map_err(FetchError::RequestBuilderHook)?;
534      }
535
536      let cancel_handle = CancelHandle::new_rc();
537      let cancel_handle_ = cancel_handle.clone();
538
539      let fut = async move {
540        client
541          .send(request)
542          .map_err(Into::into)
543          .or_cancel(cancel_handle_)
544          .await
545      };
546
547      let request_rid = state.resource_table.add(FetchRequestResource {
548        future: Box::pin(fut),
549        url,
550      });
551
552      let cancel_handle_rid =
553        state.resource_table.add(FetchCancelHandle(cancel_handle));
554
555      (request_rid, Some(cancel_handle_rid))
556    }
557    "data" => {
558      let data_url =
559        DataUrl::process(url.as_str()).map_err(FetchError::DataUrl)?;
560
561      let (body, _) = data_url.decode_to_vec().map_err(FetchError::Base64)?;
562      let body = http_body_util::Full::new(body.into())
563        .map_err(|never| match never {})
564        .boxed();
565
566      let response = http::Response::builder()
567        .status(http::StatusCode::OK)
568        .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
569        .body(body)?;
570
571      let fut = async move { Ok(Ok(response)) };
572
573      let request_rid = state.resource_table.add(FetchRequestResource {
574        future: Box::pin(fut),
575        url,
576      });
577
578      (request_rid, None)
579    }
580    "blob" => {
581      // Blob URL resolution happens in the JS side of fetch. If we got here is
582      // because the URL isn't an object URL.
583      return Err(FetchError::BlobNotFound);
584    }
585    _ => return Err(FetchError::SchemeNotSupported(scheme.to_string())),
586  };
587
588  Ok(FetchReturn {
589    request_rid,
590    cancel_handle_rid,
591  })
592}
593
594#[derive(Default, ToV8)]
595pub struct FetchResponse {
596  pub status: u16,
597  pub status_text: String,
598  pub headers: Vec<(ByteString, ByteString)>,
599  pub url: String,
600  pub response_rid: ResourceId,
601  #[to_v8(serde)]
602  pub content_length: Option<u64>,
603  /// This field is populated if some error occurred which needs to be
604  /// reconstructed in the JS side to set the error _cause_.
605  /// In the tuple, the first element is an error message and the second one is
606  /// an error cause.
607  pub error: Option<(String, String)>,
608}
609
610#[op2]
611pub async fn op_fetch_send(
612  state: Rc<RefCell<OpState>>,
613  #[smi] rid: ResourceId,
614) -> Result<FetchResponse, FetchError> {
615  let request = state
616    .borrow_mut()
617    .resource_table
618    .take::<FetchRequestResource>(rid)?;
619
620  let request = Rc::try_unwrap(request)
621    .ok()
622    .expect("multiple op_fetch_send ongoing");
623
624  let res = match request.future.await {
625    Ok(Ok(res)) => res,
626    Ok(Err(err)) => {
627      // We're going to try and rescue the error cause from a stream and return it from this fetch.
628      // If any error in the chain is a hyper body error, return that as a special result we can use to
629      // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
630      // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
631
632      if let FetchError::ClientSend(err_src) = &err
633        && let Some(client_err) = std::error::Error::source(&err_src.source)
634        && let Some(err_src) = client_err.downcast_ref::<hyper::Error>()
635        && let Some(err_src) = std::error::Error::source(err_src)
636      {
637        return Ok(FetchResponse {
638          error: Some((err.to_string(), err_src.to_string())),
639          ..Default::default()
640        });
641      }
642
643      return Err(err);
644    }
645    Err(_) => return Err(FetchError::RequestCanceled),
646  };
647
648  let status = res.status();
649  let url = request.url.into();
650  let mut res_headers = Vec::new();
651  for (key, val) in res.headers().iter() {
652    res_headers.push((key.as_str().into(), val.as_bytes().into()));
653  }
654
655  let content_length = hyper::body::Body::size_hint(res.body()).exact();
656
657  let response_rid = state
658    .borrow_mut()
659    .resource_table
660    .add(FetchResponseResource::new(res, content_length));
661
662  Ok(FetchResponse {
663    status: status.as_u16(),
664    status_text: status.canonical_reason().unwrap_or("").to_string(),
665    headers: res_headers,
666    url,
667    response_rid,
668    content_length,
669    error: None,
670  })
671}
672
673type CancelableResponseResult =
674  Result<Result<http::Response<ResBody>, FetchError>, Canceled>;
675
676pub struct FetchRequestResource {
677  pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
678  pub url: Url,
679}
680
681impl Resource for FetchRequestResource {
682  fn name(&self) -> Cow<'_, str> {
683    "fetchRequest".into()
684  }
685}
686
687pub struct FetchCancelHandle(pub Rc<CancelHandle>);
688
689impl Resource for FetchCancelHandle {
690  fn name(&self) -> Cow<'_, str> {
691    "fetchCancelHandle".into()
692  }
693
694  fn close(self: Rc<Self>) {
695    self.0.cancel()
696  }
697}
698
699type BytesStream =
700  Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
701
702pub enum FetchResponseReader {
703  Start(http::Response<ResBody>),
704  BodyReader(Peekable<BytesStream>),
705}
706
707impl Default for FetchResponseReader {
708  fn default() -> Self {
709    let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
710    Self::BodyReader(stream.peekable())
711  }
712}
713#[derive(Debug)]
714pub struct FetchResponseResource {
715  pub response_reader: AsyncRefCell<FetchResponseReader>,
716  pub cancel: CancelHandle,
717  pub size: Option<u64>,
718}
719
720impl FetchResponseResource {
721  pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
722    Self {
723      response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
724      cancel: CancelHandle::default(),
725      size,
726    }
727  }
728
729  pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
730    let reader = self.response_reader.into_inner();
731    match reader {
732      FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
733      _ => unreachable!(),
734    }
735  }
736}
737
738impl Resource for FetchResponseResource {
739  fn name(&self) -> Cow<'_, str> {
740    "fetchResponse".into()
741  }
742
743  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
744    Box::pin(async move {
745      let mut reader =
746        RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
747
748      let body = loop {
749        match &mut *reader {
750          FetchResponseReader::BodyReader(reader) => break reader,
751          FetchResponseReader::Start(_) => {}
752        }
753
754        match std::mem::take(&mut *reader) {
755          FetchResponseReader::Start(resp) => {
756            let stream: BytesStream = Box::pin(
757              resp
758                .into_body()
759                .into_data_stream()
760                .map(|r| r.map_err(std::io::Error::other)),
761            );
762            *reader = FetchResponseReader::BodyReader(stream.peekable());
763          }
764          FetchResponseReader::BodyReader(_) => unreachable!(),
765        }
766      };
767      let fut = async move {
768        let mut reader = Pin::new(body);
769        loop {
770          match reader.as_mut().peek_mut().await {
771            Some(Ok(chunk)) if !chunk.is_empty() => {
772              let len = min(limit, chunk.len());
773              let chunk = chunk.split_to(len);
774              break Ok(chunk.into());
775            }
776            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
777            // currently has a peeked value that can be synchronously returned
778            // from `next()`.
779            //
780            // The future returned from `next()` is always ready, so we can
781            // safely call `await` on it without creating a race condition.
782            Some(_) => match reader.as_mut().next().await.unwrap() {
783              Ok(chunk) => assert!(chunk.is_empty()),
784              Err(err) => break Err(JsErrorBox::type_error(err.to_string())),
785            },
786            None => break Ok(BufView::empty()),
787          }
788        }
789      };
790
791      let cancel_handle = RcRef::map(self, |r| &r.cancel);
792      fut
793        .try_or_cancel(cancel_handle)
794        .await
795        .map_err(JsErrorBox::from_err)
796    })
797  }
798
799  fn size_hint(&self) -> (u64, Option<u64>) {
800    (self.size.unwrap_or(0), self.size)
801  }
802
803  fn close(self: Rc<Self>) {
804    self.cancel.cancel()
805  }
806}
807
808pub struct HttpClientResource {
809  pub client: Client,
810  pub allow_host: bool,
811}
812
813impl Resource for HttpClientResource {
814  fn name(&self) -> Cow<'_, str> {
815    "httpClient".into()
816  }
817}
818
819impl HttpClientResource {
820  fn new(client: Client, allow_host: bool) -> Self {
821    Self { client, allow_host }
822  }
823}
824
825#[derive(Debug, FromV8)]
826pub struct CreateHttpClientArgs {
827  ca_certs: Vec<String>,
828  #[from_v8(serde)]
829  proxy: Option<Proxy>,
830  pool_max_idle_per_host: Option<usize>,
831  #[from_v8(serde)]
832  pool_idle_timeout: Option<serde_json::Value>,
833  #[from_v8(default = true)]
834  http1: bool,
835  #[from_v8(default = true)]
836  http2: bool,
837  #[from_v8(default)]
838  allow_host: bool,
839  local_address: Option<String>,
840}
841
842#[op2(stack_trace)]
843#[smi]
844#[allow(clippy::result_large_err, reason = "TODO: investigate")]
845pub fn op_fetch_custom_client(
846  state: &mut OpState,
847  #[scoped] mut args: CreateHttpClientArgs,
848  #[cppgc] tls_keys: &TlsKeysHolder,
849) -> Result<ResourceId, FetchError> {
850  if let Some(proxy) = &mut args.proxy {
851    let permissions = state.borrow_mut::<PermissionsContainer>();
852    match proxy {
853      Proxy::Http { url, .. } => {
854        let url = Url::parse(url)?;
855        permissions.check_net_url(&url, "Deno.createHttpClient()")?;
856      }
857      Proxy::Tcp { hostname, port } => {
858        permissions
859          .check_net(&(hostname, Some(*port)), "Deno.createHttpClient()")?;
860      }
861      Proxy::Unix {
862        path: original_path,
863      } => {
864        let path = Path::new(original_path);
865        let resolved_path = permissions
866          .check_open(
867            Cow::Borrowed(path),
868            OpenAccessKind::ReadWriteNoFollow,
869            Some("Deno.createHttpClient()"),
870          )?
871          .into_path();
872        if path != resolved_path {
873          *original_path = resolved_path.to_string_lossy().into_owned();
874        }
875      }
876      Proxy::Vsock { cid, port } => {
877        let permissions = state.borrow_mut::<PermissionsContainer>();
878        permissions.check_net_vsock(*cid, *port, "Deno.createHttpClient()")?;
879      }
880    }
881  }
882
883  let options = state.borrow::<Options>();
884  let ca_certs = args
885    .ca_certs
886    .into_iter()
887    .map(|cert| cert.into_bytes())
888    .collect::<Vec<_>>();
889
890  let client = create_http_client(
891    &options.user_agent,
892    CreateHttpClientOptions {
893      root_cert_store: options
894        .root_cert_store()
895        .map_err(HttpClientCreateError::RootCertStore)?,
896      ca_certs,
897      proxy: args.proxy,
898      dns_resolver: dns::Resolver::default(),
899      unsafely_ignore_certificate_errors: options
900        .unsafely_ignore_certificate_errors
901        .clone(),
902      client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
903      pool_max_idle_per_host: args.pool_max_idle_per_host,
904      pool_idle_timeout: args.pool_idle_timeout.and_then(
905        |timeout| match timeout {
906          serde_json::Value::Bool(true) => None,
907          serde_json::Value::Bool(false) => Some(None),
908          serde_json::Value::Number(specify) => {
909            Some(Some(specify.as_u64().unwrap_or_default()))
910          }
911          _ => Some(None),
912        },
913      ),
914      http1: args.http1,
915      http2: args.http2,
916      local_address: args.local_address,
917      client_builder_hook: options.client_builder_hook,
918    },
919  )?;
920
921  let rid = state
922    .resource_table
923    .add(HttpClientResource::new(client, args.allow_host));
924  Ok(rid)
925}
926
927#[derive(Debug, Clone)]
928pub struct CreateHttpClientOptions {
929  pub root_cert_store: Option<RootCertStore>,
930  pub ca_certs: Vec<Vec<u8>>,
931  pub proxy: Option<Proxy>,
932  pub dns_resolver: dns::Resolver,
933  pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
934  pub client_cert_chain_and_key: Option<TlsKey>,
935  pub pool_max_idle_per_host: Option<usize>,
936  pub pool_idle_timeout: Option<Option<u64>>,
937  pub http1: bool,
938  pub http2: bool,
939  pub local_address: Option<String>,
940  pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
941}
942
943impl Default for CreateHttpClientOptions {
944  fn default() -> Self {
945    CreateHttpClientOptions {
946      root_cert_store: None,
947      ca_certs: vec![],
948      proxy: None,
949      dns_resolver: dns::Resolver::default(),
950      unsafely_ignore_certificate_errors: None,
951      client_cert_chain_and_key: None,
952      pool_max_idle_per_host: None,
953      pool_idle_timeout: None,
954      http1: true,
955      http2: true,
956      local_address: None,
957      client_builder_hook: None,
958    }
959  }
960}
961
962#[derive(Debug, thiserror::Error, deno_error::JsError)]
963#[class(type)]
964pub enum HttpClientCreateError {
965  #[error(transparent)]
966  Tls(deno_tls::TlsError),
967  #[error("Illegal characters in User-Agent: received {0}")]
968  InvalidUserAgent(String),
969  #[error("Invalid address: {0}")]
970  InvalidAddress(String),
971  #[error("invalid proxy url")]
972  InvalidProxyUrl,
973  #[error(
974    "Cannot create Http Client: either `http1` or `http2` needs to be set to true"
975  )]
976  HttpVersionSelectionInvalid,
977  #[class(inherit)]
978  #[error(transparent)]
979  RootCertStore(JsErrorBox),
980  #[error("Unix proxy is not supported on Windows")]
981  UnixProxyNotSupportedOnWindows,
982  #[error("Vsock proxy is not supported on this platform")]
983  VsockProxyNotSupported,
984}
985
986/// Create new instance of async Client. This client supports
987/// proxies and doesn't follow redirects.
988pub fn create_http_client(
989  user_agent: &str,
990  options: CreateHttpClientOptions,
991) -> Result<Client, HttpClientCreateError> {
992  let mut tls_config =
993    deno_tls::create_client_config(deno_tls::TlsClientConfigOptions {
994      root_cert_store: options.root_cert_store,
995      ca_certs: options.ca_certs,
996      unsafely_ignore_certificate_errors: options
997        .unsafely_ignore_certificate_errors,
998      unsafely_disable_hostname_verification: false,
999      cert_chain_and_key: options.client_cert_chain_and_key.into(),
1000      socket_use: deno_tls::SocketUse::Http,
1001    })
1002    .map_err(HttpClientCreateError::Tls)?;
1003
1004  // Proxy TLS should not send ALPN
1005  tls_config.alpn_protocols.clear();
1006  let proxy_tls_config = Arc::from(tls_config.clone());
1007
1008  let mut alpn_protocols = vec![];
1009  if options.http2 {
1010    alpn_protocols.push("h2".into());
1011  }
1012  if options.http1 {
1013    alpn_protocols.push("http/1.1".into());
1014  }
1015  tls_config.alpn_protocols = alpn_protocols;
1016  let tls_config = Arc::from(tls_config);
1017
1018  let mut http_connector =
1019    HttpConnector::new_with_resolver(options.dns_resolver.clone());
1020  http_connector.enforce_http(false);
1021  if let Some(local_address) = options.local_address {
1022    let local_addr = local_address
1023      .parse::<IpAddr>()
1024      .map_err(|_| HttpClientCreateError::InvalidAddress(local_address))?;
1025    http_connector.set_local_address(Some(local_addr));
1026  }
1027
1028  let user_agent = user_agent.parse::<HeaderValue>().map_err(|_| {
1029    HttpClientCreateError::InvalidUserAgent(user_agent.to_string())
1030  })?;
1031
1032  let mut builder = HyperClientBuilder::new(TokioExecutor::new());
1033  builder.timer(TokioTimer::new());
1034  builder.pool_timer(TokioTimer::new());
1035
1036  if let Some(client_builder_hook) = options.client_builder_hook {
1037    builder = client_builder_hook(builder);
1038  }
1039
1040  let mut proxies = proxy::from_env();
1041  if let Some(proxy) = options.proxy {
1042    let intercept = match proxy {
1043      Proxy::Http { url, basic_auth } => {
1044        let target = proxy::Target::parse(&url)
1045          .ok_or_else(|| HttpClientCreateError::InvalidProxyUrl)?;
1046        let mut intercept = proxy::Intercept::all(target);
1047        if let Some(basic_auth) = &basic_auth {
1048          intercept.set_auth(&basic_auth.username, &basic_auth.password);
1049        }
1050        intercept
1051      }
1052      Proxy::Tcp {
1053        hostname: host,
1054        port,
1055      } => {
1056        let target = proxy::Target::new_tcp(host, port);
1057        proxy::Intercept::all(target)
1058      }
1059      #[cfg(not(windows))]
1060      Proxy::Unix { path } => {
1061        let target = proxy::Target::new_unix(PathBuf::from(path));
1062        proxy::Intercept::all(target)
1063      }
1064      #[cfg(windows)]
1065      Proxy::Unix { .. } => {
1066        return Err(HttpClientCreateError::UnixProxyNotSupportedOnWindows);
1067      }
1068      #[cfg(any(
1069        target_os = "android",
1070        target_os = "linux",
1071        target_os = "macos"
1072      ))]
1073      Proxy::Vsock { cid, port } => {
1074        let target = proxy::Target::new_vsock(cid, port);
1075        proxy::Intercept::all(target)
1076      }
1077      #[cfg(not(any(
1078        target_os = "android",
1079        target_os = "linux",
1080        target_os = "macos"
1081      )))]
1082      Proxy::Vsock { .. } => {
1083        return Err(HttpClientCreateError::VsockProxyNotSupported);
1084      }
1085    };
1086    proxies.prepend(intercept);
1087  }
1088  let proxies = Arc::new(proxies);
1089  let connector = proxy::ProxyConnector {
1090    http: http_connector,
1091    proxies,
1092    tls: tls_config,
1093    tls_proxy: proxy_tls_config,
1094    user_agent: Some(user_agent.clone()),
1095  };
1096
1097  if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
1098    builder.pool_max_idle_per_host(pool_max_idle_per_host);
1099  }
1100
1101  if let Some(pool_idle_timeout) = options.pool_idle_timeout {
1102    builder.pool_idle_timeout(
1103      pool_idle_timeout.map(std::time::Duration::from_millis),
1104    );
1105  }
1106
1107  match (options.http1, options.http2) {
1108    (true, false) => {} // noop, handled by ALPN above
1109    (false, true) => {
1110      builder.http2_only(true);
1111    }
1112    (true, true) => {}
1113    (false, false) => {
1114      return Err(HttpClientCreateError::HttpVersionSelectionInvalid);
1115    }
1116  }
1117
1118  let pooled_client = builder.build(connector.clone());
1119  let retry_client = retry::Retry::new(FetchRetry, pooled_client);
1120  let decompress = Decompression::new(retry_client).gzip(true).br(true);
1121
1122  Ok(Client {
1123    inner: decompress,
1124    connector,
1125    user_agent,
1126  })
1127}
1128
1129#[op2]
1130pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
1131  input.into()
1132}
1133
1134#[derive(Clone, Debug)]
1135pub struct Client {
1136  inner: Decompression<
1137    retry::Retry<
1138      FetchRetry,
1139      hyper_util::client::legacy::Client<Connector, ReqBody>,
1140    >,
1141  >,
1142  connector: Connector,
1143  user_agent: HeaderValue,
1144}
1145
1146#[derive(Debug, thiserror::Error, deno_error::JsError)]
1147pub enum ClientConnectError {
1148  #[class(type)]
1149  #[error("HTTP/1.1 not supported by this client")]
1150  Http1NotSupported,
1151  #[class(type)]
1152  #[error("HTTP/2 not supported by this client")]
1153  Http2NotSupported,
1154  #[class(generic)]
1155  #[error(transparent)]
1156  Connector(BoxError),
1157}
1158
1159impl Client {
1160  pub async fn connect(
1161    &self,
1162    uri: Uri,
1163    socket_use: SocketUse,
1164  ) -> Result<
1165    impl tokio::io::AsyncRead
1166    + tokio::io::AsyncWrite
1167    + Connection
1168    + Unpin
1169    + Send
1170    + 'static,
1171    ClientConnectError,
1172  > {
1173    let mut connector = match socket_use {
1174      SocketUse::Http1Only => {
1175        let Some(connector) = self.connector.clone().h1_only() else {
1176          return Err(ClientConnectError::Http1NotSupported);
1177        };
1178        connector
1179      }
1180      SocketUse::Http2Only => {
1181        let Some(connector) = self.connector.clone().h2_only() else {
1182          return Err(ClientConnectError::Http2NotSupported);
1183        };
1184        connector
1185      }
1186      _ => self.connector.clone(),
1187    };
1188    let connection = connector
1189      .call(uri)
1190      .await
1191      .map_err(ClientConnectError::Connector)?;
1192    Ok(TokioIo::new(connection))
1193  }
1194}
1195
1196type Connector = proxy::ProxyConnector<HttpConnector<dns::Resolver>>;
1197
1198#[allow(
1199  clippy::declare_interior_mutable_const,
1200  reason = "clippy is wrong here"
1201)]
1202const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
1203
1204#[derive(Debug, deno_error::JsError)]
1205#[class(type)]
1206pub struct ClientSendError {
1207  uri: Uri,
1208  pub source: hyper_util::client::legacy::Error,
1209}
1210
1211impl ClientSendError {
1212  pub fn is_connect_error(&self) -> bool {
1213    self.source.is_connect()
1214  }
1215
1216  fn http_info(&self) -> Option<HttpInfo> {
1217    let mut exts = Extensions::new();
1218    self.source.connect_info()?.get_extras(&mut exts);
1219    exts.remove::<HttpInfo>()
1220  }
1221}
1222
1223impl std::fmt::Display for ClientSendError {
1224  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1225    // NOTE: we can use `std::error::Report` instead once it's stabilized.
1226    let detail = error_reporter::Report::new(&self.source);
1227
1228    match self.http_info() {
1229      Some(http_info) => {
1230        write!(
1231          f,
1232          "error sending request from {src} for {uri} ({dst}): {detail}",
1233          src = http_info.local_addr(),
1234          uri = self.uri,
1235          dst = http_info.remote_addr(),
1236          detail = detail,
1237        )
1238      }
1239      None => {
1240        write!(
1241          f,
1242          "error sending request for url ({uri}): {detail}",
1243          uri = self.uri,
1244          detail = detail,
1245        )
1246      }
1247    }
1248  }
1249}
1250
1251impl std::error::Error for ClientSendError {
1252  fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1253    Some(&self.source)
1254  }
1255}
1256
1257pub trait CommonRequest {
1258  fn uri(&self) -> &Uri;
1259  fn headers_mut(&mut self) -> &mut HeaderMap;
1260}
1261
1262impl CommonRequest for http::Request<ReqBody> {
1263  fn uri(&self) -> &Uri {
1264    self.uri()
1265  }
1266
1267  fn headers_mut(&mut self) -> &mut HeaderMap {
1268    self.headers_mut()
1269  }
1270}
1271
1272impl CommonRequest for http::request::Builder {
1273  fn uri(&self) -> &Uri {
1274    http::request::Builder::uri_ref(self).expect("uri not set")
1275  }
1276
1277  fn headers_mut(&mut self) -> &mut HeaderMap {
1278    http::request::Builder::headers_mut(self).expect("headers not set")
1279  }
1280}
1281
1282impl Client {
1283  /// Injects common headers like User-Agent and Proxy-Authorization.
1284  pub fn inject_common_headers(&self, req: &mut impl CommonRequest) {
1285    req
1286      .headers_mut()
1287      .entry(USER_AGENT)
1288      .or_insert_with(|| self.user_agent.clone());
1289
1290    if let Some(auth) = self.connector.proxies.http_forward_auth(req.uri()) {
1291      req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
1292    }
1293  }
1294
1295  pub async fn send(
1296    self,
1297    mut req: http::Request<ReqBody>,
1298  ) -> Result<http::Response<ResBody>, ClientSendError> {
1299    self.inject_common_headers(&mut req);
1300
1301    req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1302
1303    let uri = req.uri().clone();
1304
1305    let resp = self
1306      .inner
1307      .oneshot(req)
1308      .await
1309      .map_err(|e| ClientSendError { uri, source: e })?;
1310    Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1311  }
1312
1313  /// Sends a request bypassing the transparent decompression middleware.
1314  /// The response body will contain raw bytes (potentially compressed).
1315  /// The caller is responsible for checking Content-Encoding and
1316  /// decompressing if needed.
1317  pub async fn send_no_decompress(
1318    self,
1319    mut req: http::Request<ReqBody>,
1320  ) -> Result<http::Response<ResBody>, ClientSendError> {
1321    self.inject_common_headers(&mut req);
1322
1323    req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1324
1325    let uri = req.uri().clone();
1326
1327    // .into_inner() unwraps the Decompression middleware layer
1328    let resp = self
1329      .inner
1330      .into_inner()
1331      .oneshot(req)
1332      .await
1333      .map_err(|e| ClientSendError { uri, source: e })?;
1334    Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1335  }
1336}
1337
1338// This is a custom enum to allow the retry policy to clone the variants that could be retried.
1339pub enum ReqBody {
1340  Full(http_body_util::Full<Bytes>),
1341  Empty(http_body_util::Empty<Bytes>),
1342  Streaming(BoxBody<Bytes, JsErrorBox>),
1343}
1344
1345pub type ResBody = BoxBody<Bytes, JsErrorBox>;
1346
1347impl ReqBody {
1348  pub fn full(bytes: Bytes) -> Self {
1349    ReqBody::Full(http_body_util::Full::new(bytes))
1350  }
1351
1352  pub fn empty() -> Self {
1353    ReqBody::Empty(http_body_util::Empty::new())
1354  }
1355
1356  pub fn streaming<B>(body: B) -> Self
1357  where
1358    B: hyper::body::Body<Data = Bytes, Error = JsErrorBox>
1359      + Send
1360      + Sync
1361      + 'static,
1362  {
1363    ReqBody::Streaming(BoxBody::new(body))
1364  }
1365}
1366
1367impl hyper::body::Body for ReqBody {
1368  type Data = Bytes;
1369  type Error = JsErrorBox;
1370
1371  fn poll_frame(
1372    mut self: Pin<&mut Self>,
1373    cx: &mut Context<'_>,
1374  ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1375    match &mut *self {
1376      ReqBody::Full(b) => {
1377        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1378      }
1379      ReqBody::Empty(b) => {
1380        Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1381      }
1382      ReqBody::Streaming(b) => Pin::new(b).poll_frame(cx),
1383    }
1384  }
1385
1386  fn is_end_stream(&self) -> bool {
1387    match self {
1388      ReqBody::Full(b) => b.is_end_stream(),
1389      ReqBody::Empty(b) => b.is_end_stream(),
1390      ReqBody::Streaming(b) => b.is_end_stream(),
1391    }
1392  }
1393
1394  fn size_hint(&self) -> hyper::body::SizeHint {
1395    match self {
1396      ReqBody::Full(b) => b.size_hint(),
1397      ReqBody::Empty(b) => b.size_hint(),
1398      ReqBody::Streaming(b) => b.size_hint(),
1399    }
1400  }
1401}
1402
1403/// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572
1404/// Check the request URL for a "username:password" type authority, and if
1405/// found, remove it from the URL and return it.
1406pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
1407  use percent_encoding::percent_decode;
1408
1409  if url.has_authority() {
1410    let username: String = percent_decode(url.username().as_bytes())
1411      .decode_utf8()
1412      .ok()?
1413      .into();
1414    let password = url.password().and_then(|pass| {
1415      percent_decode(pass.as_bytes())
1416        .decode_utf8()
1417        .ok()
1418        .map(String::from)
1419    });
1420    if !username.is_empty() || password.is_some() {
1421      url
1422        .set_username("")
1423        .expect("has_authority means set_username shouldn't fail");
1424      url
1425        .set_password(None)
1426        .expect("has_authority means set_password shouldn't fail");
1427      return Some((username, password));
1428    }
1429  }
1430
1431  None
1432}
1433
1434#[op2(fast)]
1435fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
1436  promise.state() != v8::PromiseState::Pending
1437}
1438
1439/// Deno.fetch's retry policy.
1440#[derive(Clone, Debug)]
1441struct FetchRetry;
1442
1443/// Marker extension that a request has been retried once.
1444#[derive(Clone, Debug)]
1445struct Retried;
1446
1447impl<ResBody, E>
1448  retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
1449  for FetchRetry
1450where
1451  E: std::error::Error + 'static,
1452{
1453  /// Don't delay retries.
1454  type Future = future::Ready<()>;
1455
1456  fn retry(
1457    &mut self,
1458    req: &mut http::Request<ReqBody>,
1459    result: &mut Result<http::Response<ResBody>, E>,
1460  ) -> Option<Self::Future> {
1461    if req.extensions().get::<Retried>().is_some() {
1462      // only retry once
1463      return None;
1464    }
1465
1466    match result {
1467      Ok(..) => {
1468        // never retry a Response
1469        None
1470      }
1471      Err(err) => {
1472        if is_error_retryable(&*err) {
1473          req.extensions_mut().insert(Retried);
1474          Some(future::ready(()))
1475        } else {
1476          None
1477        }
1478      }
1479    }
1480  }
1481
1482  fn clone_request(
1483    &mut self,
1484    req: &http::Request<ReqBody>,
1485  ) -> Option<http::Request<ReqBody>> {
1486    let body = match req.body() {
1487      ReqBody::Full(b) => ReqBody::Full(b.clone()),
1488      ReqBody::Empty(b) => ReqBody::Empty(*b),
1489      ReqBody::Streaming(..) => return None,
1490    };
1491
1492    let mut clone = http::Request::new(body);
1493    *clone.method_mut() = req.method().clone();
1494    *clone.uri_mut() = req.uri().clone();
1495    *clone.headers_mut() = req.headers().clone();
1496    *clone.extensions_mut() = req.extensions().clone();
1497    Some(clone)
1498  }
1499}
1500
1501fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
1502  // Note: hyper doesn't promise it will always be this h2 version. Keep up to date.
1503  if let Some(err) = find_source::<h2::Error>(err) {
1504    // They sent us a graceful shutdown, try with a new connection!
1505    if err.is_go_away()
1506      && err.is_remote()
1507      && err.reason() == Some(h2::Reason::NO_ERROR)
1508    {
1509      return true;
1510    }
1511
1512    // REFUSED_STREAM was sent from the server, which is safe to retry.
1513    // https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
1514    if err.is_reset()
1515      && err.is_remote()
1516      && err.reason() == Some(h2::Reason::REFUSED_STREAM)
1517    {
1518      return true;
1519    }
1520  }
1521
1522  // HTTP/1.1: The connection was closed before the message completed.
1523  // This happens when a pooled keep-alive connection is stale (e.g. the
1524  // server shut down between requests). Safe to retry because the server
1525  // never received/processed the request on this connection.
1526  if let Some(err) = find_source::<hyper::Error>(err)
1527    && err.is_incomplete_message()
1528  {
1529    return true;
1530  }
1531
1532  // Connection reset/aborted by the server before we could send the request.
1533  // This is another manifestation of stale pooled connections.
1534  // ConnectionReset (ECONNRESET) on Unix, ConnectionAborted (WSAECONNABORTED /
1535  // os error 10053) on Windows.
1536  if let Some(err) = find_source::<std::io::Error>(err)
1537    && matches!(
1538      err.kind(),
1539      std::io::ErrorKind::ConnectionReset
1540        | std::io::ErrorKind::ConnectionAborted
1541    )
1542  {
1543    return true;
1544  }
1545
1546  false
1547}
1548
1549fn find_source<'a, E: std::error::Error + 'static>(
1550  err: &'a (dyn std::error::Error + 'static),
1551) -> Option<&'a E> {
1552  let mut err = Some(err);
1553  while let Some(src) = err {
1554    if let Some(found) = src.downcast_ref::<E>() {
1555      return Some(found);
1556    }
1557    err = src.source();
1558  }
1559  None
1560}