1pub mod dns;
4mod fs_fetch_handler;
5mod proxy;
6#[cfg(test)]
7mod tests;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::cmp::min;
12use std::convert::From;
13use std::future;
14use std::future::Future;
15use std::path::Path;
16use std::pin::Pin;
17use std::rc::Rc;
18use std::sync::Arc;
19use std::task::Context;
20use std::task::Poll;
21
22use bytes::Bytes;
23pub use data_url;
25use data_url::DataUrl;
26use deno_core::futures::stream::Peekable;
27use deno_core::futures::FutureExt;
28use deno_core::futures::Stream;
29use deno_core::futures::StreamExt;
30use deno_core::futures::TryFutureExt;
31use deno_core::op2;
32use deno_core::url;
33use deno_core::url::Url;
34use deno_core::v8;
35use deno_core::AsyncRefCell;
36use deno_core::AsyncResult;
37use deno_core::BufView;
38use deno_core::ByteString;
39use deno_core::CancelFuture;
40use deno_core::CancelHandle;
41use deno_core::CancelTryFuture;
42use deno_core::Canceled;
43use deno_core::JsBuffer;
44use deno_core::OpState;
45use deno_core::RcRef;
46use deno_core::Resource;
47use deno_core::ResourceId;
48use deno_error::JsErrorBox;
49pub use deno_fs::FsError;
50use deno_path_util::PathToUrlError;
51use deno_permissions::PermissionCheckError;
52use deno_tls::rustls::RootCertStore;
53use deno_tls::Proxy;
54use deno_tls::RootCertStoreProvider;
55use deno_tls::TlsKey;
56use deno_tls::TlsKeys;
57use deno_tls::TlsKeysHolder;
58pub use fs_fetch_handler::FsFetchHandler;
59use http::header::HeaderName;
60use http::header::HeaderValue;
61use http::header::ACCEPT;
62use http::header::ACCEPT_ENCODING;
63use http::header::AUTHORIZATION;
64use http::header::CONTENT_LENGTH;
65use http::header::HOST;
66use http::header::PROXY_AUTHORIZATION;
67use http::header::RANGE;
68use http::header::USER_AGENT;
69use http::Extensions;
70use http::Method;
71use http::Uri;
72use http_body_util::combinators::BoxBody;
73use http_body_util::BodyExt;
74use hyper::body::Frame;
75use hyper_util::client::legacy::connect::HttpConnector;
76use hyper_util::client::legacy::connect::HttpInfo;
77use hyper_util::client::legacy::Builder as HyperClientBuilder;
78use hyper_util::rt::TokioExecutor;
79use hyper_util::rt::TokioTimer;
80pub use proxy::basic_auth;
81use serde::Deserialize;
82use serde::Serialize;
83use tower::retry;
84use tower::ServiceExt;
85use tower_http::decompression::Decompression;
86
87#[derive(Clone)]
88pub struct Options {
89 pub user_agent: String,
90 pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
91 pub proxy: Option<Proxy>,
92 pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
102 #[allow(clippy::type_complexity)]
103 pub request_builder_hook:
104 Option<fn(&mut http::Request<ReqBody>) -> Result<(), JsErrorBox>>,
105 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
106 pub client_cert_chain_and_key: TlsKeys,
107 pub file_fetch_handler: Rc<dyn FetchHandler>,
108 pub resolver: dns::Resolver,
109}
110
111impl Options {
112 pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
113 Ok(match &self.root_cert_store_provider {
114 Some(provider) => Some(provider.get_or_try_init()?.clone()),
115 None => None,
116 })
117 }
118}
119
120impl Default for Options {
121 fn default() -> Self {
122 Self {
123 user_agent: "".to_string(),
124 root_cert_store_provider: None,
125 proxy: None,
126 client_builder_hook: None,
127 request_builder_hook: None,
128 unsafely_ignore_certificate_errors: None,
129 client_cert_chain_and_key: TlsKeys::Null,
130 file_fetch_handler: Rc::new(DefaultFileFetchHandler),
131 resolver: dns::Resolver::default(),
132 }
133 }
134}
135
136deno_core::extension!(deno_fetch,
137 deps = [ deno_webidl, deno_web, deno_url, deno_console ],
138 parameters = [FP: FetchPermissions],
139 ops = [
140 op_fetch<FP>,
141 op_fetch_send,
142 op_utf8_to_byte_string,
143 op_fetch_custom_client<FP>,
144 op_fetch_promise_is_settled,
145 ],
146 esm = [
147 "20_headers.js",
148 "21_formdata.js",
149 "22_body.js",
150 "22_http_client.js",
151 "23_request.js",
152 "23_response.js",
153 "26_fetch.js",
154 "27_eventsource.js"
155 ],
156 options = {
157 options: Options,
158 },
159 state = |state, options| {
160 state.put::<Options>(options.options);
161 },
162);
163
164#[derive(Debug, thiserror::Error, deno_error::JsError)]
165pub enum FetchError {
166 #[class(inherit)]
167 #[error(transparent)]
168 Resource(#[from] deno_core::error::ResourceError),
169 #[class(inherit)]
170 #[error(transparent)]
171 Permission(#[from] PermissionCheckError),
172 #[class(type)]
173 #[error("NetworkError when attempting to fetch resource")]
174 NetworkError,
175 #[class(type)]
176 #[error("Fetching files only supports the GET method: received {0}")]
177 FsNotGet(Method),
178 #[class(inherit)]
179 #[error(transparent)]
180 PathToUrl(#[from] PathToUrlError),
181 #[class(type)]
182 #[error("Invalid URL {0}")]
183 InvalidUrl(Url),
184 #[class(type)]
185 #[error(transparent)]
186 InvalidHeaderName(#[from] http::header::InvalidHeaderName),
187 #[class(type)]
188 #[error(transparent)]
189 InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
190 #[class(type)]
191 #[error("{0:?}")]
192 DataUrl(data_url::DataUrlError),
193 #[class(type)]
194 #[error("{0:?}")]
195 Base64(data_url::forgiving_base64::InvalidBase64),
196 #[class(type)]
197 #[error("Blob for the given URL not found.")]
198 BlobNotFound,
199 #[class(type)]
200 #[error("Url scheme '{0}' not supported")]
201 SchemeNotSupported(String),
202 #[class(type)]
203 #[error("Request was cancelled")]
204 RequestCanceled,
205 #[class(generic)]
206 #[error(transparent)]
207 Http(#[from] http::Error),
208 #[class(inherit)]
209 #[error(transparent)]
210 ClientCreate(#[from] HttpClientCreateError),
211 #[class(inherit)]
212 #[error(transparent)]
213 Url(#[from] url::ParseError),
214 #[class(type)]
215 #[error(transparent)]
216 Method(#[from] http::method::InvalidMethod),
217 #[class(inherit)]
218 #[error(transparent)]
219 ClientSend(#[from] ClientSendError),
220 #[class(inherit)]
221 #[error(transparent)]
222 RequestBuilderHook(JsErrorBox),
223 #[class(inherit)]
224 #[error(transparent)]
225 Io(#[from] std::io::Error),
226 #[class(generic)]
227 #[error(transparent)]
228 Dns(hickory_resolver::ResolveError),
229 #[class("NotCapable")]
230 #[error("requires {0} access")]
231 NotCapable(&'static str),
232}
233
234impl From<deno_fs::FsError> for FetchError {
235 fn from(value: deno_fs::FsError) -> Self {
236 match value {
237 deno_fs::FsError::Io(_)
238 | deno_fs::FsError::FileBusy
239 | deno_fs::FsError::NotSupported => FetchError::NetworkError,
240 deno_fs::FsError::NotCapable(err) => FetchError::NotCapable(err),
241 }
242 }
243}
244
245pub type CancelableResponseFuture =
246 Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
247
248pub trait FetchHandler: dyn_clone::DynClone {
249 fn fetch_file(
253 &self,
254 state: &mut OpState,
255 url: &Url,
256 ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
257}
258
259dyn_clone::clone_trait_object!(FetchHandler);
260
261#[derive(Clone)]
263pub struct DefaultFileFetchHandler;
264
265impl FetchHandler for DefaultFileFetchHandler {
266 fn fetch_file(
267 &self,
268 _state: &mut OpState,
269 _url: &Url,
270 ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
271 let fut = async move { Ok(Err(FetchError::NetworkError)) };
272 (Box::pin(fut), None)
273 }
274}
275
276#[derive(Serialize)]
277#[serde(rename_all = "camelCase")]
278pub struct FetchReturn {
279 pub request_rid: ResourceId,
280 pub cancel_handle_rid: Option<ResourceId>,
281}
282
283pub fn get_or_create_client_from_state(
284 state: &mut OpState,
285) -> Result<Client, HttpClientCreateError> {
286 if let Some(client) = state.try_borrow::<Client>() {
287 Ok(client.clone())
288 } else {
289 let options = state.borrow::<Options>();
290 let client = create_client_from_options(options)?;
291 state.put::<Client>(client.clone());
292 Ok(client)
293 }
294}
295
296pub fn create_client_from_options(
297 options: &Options,
298) -> Result<Client, HttpClientCreateError> {
299 create_http_client(
300 &options.user_agent,
301 CreateHttpClientOptions {
302 root_cert_store: options
303 .root_cert_store()
304 .map_err(HttpClientCreateError::RootCertStore)?,
305 ca_certs: vec![],
306 proxy: options.proxy.clone(),
307 dns_resolver: options.resolver.clone(),
308 unsafely_ignore_certificate_errors: options
309 .unsafely_ignore_certificate_errors
310 .clone(),
311 client_cert_chain_and_key: options
312 .client_cert_chain_and_key
313 .clone()
314 .try_into()
315 .unwrap_or_default(),
316 pool_max_idle_per_host: None,
317 pool_idle_timeout: None,
318 http1: true,
319 http2: true,
320 client_builder_hook: options.client_builder_hook,
321 },
322 )
323}
324
325#[allow(clippy::type_complexity)]
326pub struct ResourceToBodyAdapter(
327 Rc<dyn Resource>,
328 Option<Pin<Box<dyn Future<Output = Result<BufView, JsErrorBox>>>>>,
329);
330
331impl ResourceToBodyAdapter {
332 pub fn new(resource: Rc<dyn Resource>) -> Self {
333 let future = resource.clone().read(64 * 1024);
334 Self(resource, Some(future))
335 }
336}
337
338unsafe impl Send for ResourceToBodyAdapter {}
340unsafe impl Sync for ResourceToBodyAdapter {}
342
343impl Stream for ResourceToBodyAdapter {
344 type Item = Result<Bytes, JsErrorBox>;
345
346 fn poll_next(
347 self: Pin<&mut Self>,
348 cx: &mut Context<'_>,
349 ) -> Poll<Option<Self::Item>> {
350 let this = self.get_mut();
351 if let Some(mut fut) = this.1.take() {
352 match fut.poll_unpin(cx) {
353 Poll::Pending => {
354 this.1 = Some(fut);
355 Poll::Pending
356 }
357 Poll::Ready(res) => match res {
358 Ok(buf) if buf.is_empty() => Poll::Ready(None),
359 Ok(buf) => {
360 this.1 = Some(this.0.clone().read(64 * 1024));
361 Poll::Ready(Some(Ok(buf.to_vec().into())))
362 }
363 Err(err) => Poll::Ready(Some(Err(err))),
364 },
365 }
366 } else {
367 Poll::Ready(None)
368 }
369 }
370}
371
372impl hyper::body::Body for ResourceToBodyAdapter {
373 type Data = Bytes;
374 type Error = JsErrorBox;
375
376 fn poll_frame(
377 self: Pin<&mut Self>,
378 cx: &mut Context<'_>,
379 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
380 match self.poll_next(cx) {
381 Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
382 Poll::Ready(None) => Poll::Ready(None),
383 Poll::Pending => Poll::Pending,
384 }
385 }
386}
387
388impl Drop for ResourceToBodyAdapter {
389 fn drop(&mut self) {
390 self.0.clone().close()
391 }
392}
393
394pub trait FetchPermissions {
395 fn check_net_url(
396 &mut self,
397 url: &Url,
398 api_name: &str,
399 ) -> Result<(), PermissionCheckError>;
400 #[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
401 fn check_read<'a>(
402 &mut self,
403 resolved: bool,
404 p: &'a Path,
405 api_name: &str,
406 ) -> Result<Cow<'a, Path>, FsError>;
407}
408
409impl FetchPermissions for deno_permissions::PermissionsContainer {
410 #[inline(always)]
411 fn check_net_url(
412 &mut self,
413 url: &Url,
414 api_name: &str,
415 ) -> Result<(), PermissionCheckError> {
416 deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
417 }
418
419 #[inline(always)]
420 fn check_read<'a>(
421 &mut self,
422 resolved: bool,
423 path: &'a Path,
424 api_name: &str,
425 ) -> Result<Cow<'a, Path>, FsError> {
426 if resolved {
427 self
428 .check_special_file(path, api_name)
429 .map_err(FsError::NotCapable)?;
430 return Ok(Cow::Borrowed(path));
431 }
432
433 deno_permissions::PermissionsContainer::check_read_path(
434 self,
435 path,
436 Some(api_name),
437 )
438 .map_err(|_| FsError::NotCapable("read"))
439 }
440}
441
442#[op2(stack_trace)]
443#[serde]
444#[allow(clippy::too_many_arguments)]
445pub fn op_fetch<FP>(
446 state: &mut OpState,
447 #[serde] method: ByteString,
448 #[string] url: String,
449 #[serde] headers: Vec<(ByteString, ByteString)>,
450 #[smi] client_rid: Option<u32>,
451 has_body: bool,
452 #[buffer] data: Option<JsBuffer>,
453 #[smi] resource: Option<ResourceId>,
454) -> Result<FetchReturn, FetchError>
455where
456 FP: FetchPermissions + 'static,
457{
458 let (client, allow_host) = if let Some(rid) = client_rid {
459 let r = state.resource_table.get::<HttpClientResource>(rid)?;
460 (r.client.clone(), r.allow_host)
461 } else {
462 (get_or_create_client_from_state(state)?, false)
463 };
464
465 let method = Method::from_bytes(&method)?;
466 let mut url = Url::parse(&url)?;
467
468 let scheme = url.scheme();
470 let (request_rid, cancel_handle_rid) = match scheme {
471 "file" => {
472 if method != Method::GET {
473 return Err(FetchError::FsNotGet(method));
474 }
475 let Options {
476 file_fetch_handler, ..
477 } = state.borrow_mut::<Options>();
478 let file_fetch_handler = file_fetch_handler.clone();
479 let (future, maybe_cancel_handle) =
480 file_fetch_handler.fetch_file(state, &url);
481 let request_rid = state
482 .resource_table
483 .add(FetchRequestResource { future, url });
484 let maybe_cancel_handle_rid = maybe_cancel_handle
485 .map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
486
487 (request_rid, maybe_cancel_handle_rid)
488 }
489 "http" | "https" => {
490 let permissions = state.borrow_mut::<FP>();
491 permissions.check_net_url(&url, "fetch()")?;
492
493 let maybe_authority = extract_authority(&mut url);
494 let uri = url
495 .as_str()
496 .parse::<Uri>()
497 .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
498
499 let mut con_len = None;
500 let body = if has_body {
501 match (data, resource) {
502 (Some(data), _) => {
503 con_len = Some(data.len() as u64);
505
506 ReqBody::full(data.to_vec().into())
507 }
508 (_, Some(resource)) => {
509 let resource = state.resource_table.take_any(resource)?;
510 match resource.size_hint() {
511 (body_size, Some(n)) if body_size == n && body_size > 0 => {
512 con_len = Some(body_size);
513 }
514 _ => {}
515 }
516 ReqBody::streaming(ResourceToBodyAdapter::new(resource))
517 }
518 (None, None) => unreachable!(),
519 }
520 } else {
521 if matches!(method, Method::POST | Method::PUT) {
524 con_len = Some(0);
525 }
526 ReqBody::empty()
527 };
528
529 let mut request = http::Request::new(body);
530 *request.method_mut() = method.clone();
531 *request.uri_mut() = uri.clone();
532
533 if let Some((username, password)) = maybe_authority {
534 request.headers_mut().insert(
535 AUTHORIZATION,
536 proxy::basic_auth(&username, password.as_deref()),
537 );
538 }
539 if let Some(len) = con_len {
540 request.headers_mut().insert(CONTENT_LENGTH, len.into());
541 }
542
543 for (key, value) in headers {
544 let name = HeaderName::from_bytes(&key)?;
545 let v = HeaderValue::from_bytes(&value)?;
546
547 if (name != HOST || allow_host) && name != CONTENT_LENGTH {
548 request.headers_mut().append(name, v);
549 }
550 }
551
552 if request.headers().contains_key(RANGE) {
553 request
556 .headers_mut()
557 .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
558 }
559
560 let options = state.borrow::<Options>();
561 if let Some(request_builder_hook) = options.request_builder_hook {
562 request_builder_hook(&mut request)
563 .map_err(FetchError::RequestBuilderHook)?;
564 }
565
566 let cancel_handle = CancelHandle::new_rc();
567 let cancel_handle_ = cancel_handle.clone();
568
569 let fut = async move {
570 client
571 .send(request)
572 .map_err(Into::into)
573 .or_cancel(cancel_handle_)
574 .await
575 };
576
577 let request_rid = state.resource_table.add(FetchRequestResource {
578 future: Box::pin(fut),
579 url,
580 });
581
582 let cancel_handle_rid =
583 state.resource_table.add(FetchCancelHandle(cancel_handle));
584
585 (request_rid, Some(cancel_handle_rid))
586 }
587 "data" => {
588 let data_url =
589 DataUrl::process(url.as_str()).map_err(FetchError::DataUrl)?;
590
591 let (body, _) = data_url.decode_to_vec().map_err(FetchError::Base64)?;
592 let body = http_body_util::Full::new(body.into())
593 .map_err(|never| match never {})
594 .boxed();
595
596 let response = http::Response::builder()
597 .status(http::StatusCode::OK)
598 .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
599 .body(body)?;
600
601 let fut = async move { Ok(Ok(response)) };
602
603 let request_rid = state.resource_table.add(FetchRequestResource {
604 future: Box::pin(fut),
605 url,
606 });
607
608 (request_rid, None)
609 }
610 "blob" => {
611 return Err(FetchError::BlobNotFound);
614 }
615 _ => return Err(FetchError::SchemeNotSupported(scheme.to_string())),
616 };
617
618 Ok(FetchReturn {
619 request_rid,
620 cancel_handle_rid,
621 })
622}
623
624#[derive(Default, Serialize)]
625#[serde(rename_all = "camelCase")]
626pub struct FetchResponse {
627 pub status: u16,
628 pub status_text: String,
629 pub headers: Vec<(ByteString, ByteString)>,
630 pub url: String,
631 pub response_rid: ResourceId,
632 pub content_length: Option<u64>,
633 pub remote_addr_ip: Option<String>,
634 pub remote_addr_port: Option<u16>,
635 pub error: Option<(String, String)>,
640}
641
642#[op2(async)]
643#[serde]
644pub async fn op_fetch_send(
645 state: Rc<RefCell<OpState>>,
646 #[smi] rid: ResourceId,
647) -> Result<FetchResponse, FetchError> {
648 let request = state
649 .borrow_mut()
650 .resource_table
651 .take::<FetchRequestResource>(rid)?;
652
653 let request = Rc::try_unwrap(request)
654 .ok()
655 .expect("multiple op_fetch_send ongoing");
656
657 let res = match request.future.await {
658 Ok(Ok(res)) => res,
659 Ok(Err(err)) => {
660 if let FetchError::ClientSend(err_src) = &err {
666 if let Some(client_err) = std::error::Error::source(&err_src.source) {
667 if let Some(err_src) = client_err.downcast_ref::<hyper::Error>() {
668 if let Some(err_src) = std::error::Error::source(err_src) {
669 return Ok(FetchResponse {
670 error: Some((err.to_string(), err_src.to_string())),
671 ..Default::default()
672 });
673 }
674 }
675 }
676 }
677
678 return Err(err);
679 }
680 Err(_) => return Err(FetchError::RequestCanceled),
681 };
682
683 let status = res.status();
684 let url = request.url.into();
685 let mut res_headers = Vec::new();
686 for (key, val) in res.headers().iter() {
687 res_headers.push((key.as_str().into(), val.as_bytes().into()));
688 }
689
690 let content_length = hyper::body::Body::size_hint(res.body()).exact();
691 let remote_addr = res
692 .extensions()
693 .get::<hyper_util::client::legacy::connect::HttpInfo>()
694 .map(|info| info.remote_addr());
695 let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
696 (Some(addr.ip().to_string()), Some(addr.port()))
697 } else {
698 (None, None)
699 };
700
701 let response_rid = state
702 .borrow_mut()
703 .resource_table
704 .add(FetchResponseResource::new(res, content_length));
705
706 Ok(FetchResponse {
707 status: status.as_u16(),
708 status_text: status.canonical_reason().unwrap_or("").to_string(),
709 headers: res_headers,
710 url,
711 response_rid,
712 content_length,
713 remote_addr_ip,
714 remote_addr_port,
715 error: None,
716 })
717}
718
719type CancelableResponseResult =
720 Result<Result<http::Response<ResBody>, FetchError>, Canceled>;
721
722pub struct FetchRequestResource {
723 pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
724 pub url: Url,
725}
726
727impl Resource for FetchRequestResource {
728 fn name(&self) -> Cow<str> {
729 "fetchRequest".into()
730 }
731}
732
733pub struct FetchCancelHandle(pub Rc<CancelHandle>);
734
735impl Resource for FetchCancelHandle {
736 fn name(&self) -> Cow<str> {
737 "fetchCancelHandle".into()
738 }
739
740 fn close(self: Rc<Self>) {
741 self.0.cancel()
742 }
743}
744
745type BytesStream =
746 Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
747
748pub enum FetchResponseReader {
749 Start(http::Response<ResBody>),
750 BodyReader(Peekable<BytesStream>),
751}
752
753impl Default for FetchResponseReader {
754 fn default() -> Self {
755 let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
756 Self::BodyReader(stream.peekable())
757 }
758}
759#[derive(Debug)]
760pub struct FetchResponseResource {
761 pub response_reader: AsyncRefCell<FetchResponseReader>,
762 pub cancel: CancelHandle,
763 pub size: Option<u64>,
764}
765
766impl FetchResponseResource {
767 pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
768 Self {
769 response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
770 cancel: CancelHandle::default(),
771 size,
772 }
773 }
774
775 pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
776 let reader = self.response_reader.into_inner();
777 match reader {
778 FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
779 _ => unreachable!(),
780 }
781 }
782}
783
784impl Resource for FetchResponseResource {
785 fn name(&self) -> Cow<str> {
786 "fetchResponse".into()
787 }
788
789 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
790 Box::pin(async move {
791 let mut reader =
792 RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
793
794 let body = loop {
795 match &mut *reader {
796 FetchResponseReader::BodyReader(reader) => break reader,
797 FetchResponseReader::Start(_) => {}
798 }
799
800 match std::mem::take(&mut *reader) {
801 FetchResponseReader::Start(resp) => {
802 let stream: BytesStream =
803 Box::pin(resp.into_body().into_data_stream().map(|r| {
804 r.map_err(|err| {
805 std::io::Error::new(std::io::ErrorKind::Other, err)
806 })
807 }));
808 *reader = FetchResponseReader::BodyReader(stream.peekable());
809 }
810 FetchResponseReader::BodyReader(_) => unreachable!(),
811 }
812 };
813 let fut = async move {
814 let mut reader = Pin::new(body);
815 loop {
816 match reader.as_mut().peek_mut().await {
817 Some(Ok(chunk)) if !chunk.is_empty() => {
818 let len = min(limit, chunk.len());
819 let chunk = chunk.split_to(len);
820 break Ok(chunk.into());
821 }
822 Some(_) => match reader.as_mut().next().await.unwrap() {
829 Ok(chunk) => assert!(chunk.is_empty()),
830 Err(err) => break Err(JsErrorBox::type_error(err.to_string())),
831 },
832 None => break Ok(BufView::empty()),
833 }
834 }
835 };
836
837 let cancel_handle = RcRef::map(self, |r| &r.cancel);
838 fut
839 .try_or_cancel(cancel_handle)
840 .await
841 .map_err(JsErrorBox::from_err)
842 })
843 }
844
845 fn size_hint(&self) -> (u64, Option<u64>) {
846 (self.size.unwrap_or(0), self.size)
847 }
848
849 fn close(self: Rc<Self>) {
850 self.cancel.cancel()
851 }
852}
853
854pub struct HttpClientResource {
855 pub client: Client,
856 pub allow_host: bool,
857}
858
859impl Resource for HttpClientResource {
860 fn name(&self) -> Cow<str> {
861 "httpClient".into()
862 }
863}
864
865impl HttpClientResource {
866 fn new(client: Client, allow_host: bool) -> Self {
867 Self { client, allow_host }
868 }
869}
870
871#[derive(Deserialize, Debug)]
872#[serde(rename_all = "camelCase")]
873pub struct CreateHttpClientArgs {
874 ca_certs: Vec<String>,
875 proxy: Option<Proxy>,
876 pool_max_idle_per_host: Option<usize>,
877 pool_idle_timeout: Option<serde_json::Value>,
878 #[serde(default)]
879 use_hickory_resolver: bool,
880 #[serde(default = "default_true")]
881 http1: bool,
882 #[serde(default = "default_true")]
883 http2: bool,
884 #[serde(default)]
885 allow_host: bool,
886}
887
888fn default_true() -> bool {
889 true
890}
891
892#[op2(stack_trace)]
893#[smi]
894pub fn op_fetch_custom_client<FP>(
895 state: &mut OpState,
896 #[serde] args: CreateHttpClientArgs,
897 #[cppgc] tls_keys: &TlsKeysHolder,
898) -> Result<ResourceId, FetchError>
899where
900 FP: FetchPermissions + 'static,
901{
902 if let Some(proxy) = args.proxy.clone() {
903 let permissions = state.borrow_mut::<FP>();
904 let url = Url::parse(&proxy.url)?;
905 permissions.check_net_url(&url, "Deno.createHttpClient()")?;
906 }
907
908 let options = state.borrow::<Options>();
909 let ca_certs = args
910 .ca_certs
911 .into_iter()
912 .map(|cert| cert.into_bytes())
913 .collect::<Vec<_>>();
914
915 let client = create_http_client(
916 &options.user_agent,
917 CreateHttpClientOptions {
918 root_cert_store: options
919 .root_cert_store()
920 .map_err(HttpClientCreateError::RootCertStore)?,
921 ca_certs,
922 proxy: args.proxy,
923 dns_resolver: if args.use_hickory_resolver {
924 dns::Resolver::hickory().map_err(FetchError::Dns)?
925 } else {
926 dns::Resolver::default()
927 },
928 unsafely_ignore_certificate_errors: options
929 .unsafely_ignore_certificate_errors
930 .clone(),
931 client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
932 pool_max_idle_per_host: args.pool_max_idle_per_host,
933 pool_idle_timeout: args.pool_idle_timeout.and_then(
934 |timeout| match timeout {
935 serde_json::Value::Bool(true) => None,
936 serde_json::Value::Bool(false) => Some(None),
937 serde_json::Value::Number(specify) => {
938 Some(Some(specify.as_u64().unwrap_or_default()))
939 }
940 _ => Some(None),
941 },
942 ),
943 http1: args.http1,
944 http2: args.http2,
945 client_builder_hook: options.client_builder_hook,
946 },
947 )?;
948
949 let rid = state
950 .resource_table
951 .add(HttpClientResource::new(client, args.allow_host));
952 Ok(rid)
953}
954
955#[derive(Debug, Clone)]
956pub struct CreateHttpClientOptions {
957 pub root_cert_store: Option<RootCertStore>,
958 pub ca_certs: Vec<Vec<u8>>,
959 pub proxy: Option<Proxy>,
960 pub dns_resolver: dns::Resolver,
961 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
962 pub client_cert_chain_and_key: Option<TlsKey>,
963 pub pool_max_idle_per_host: Option<usize>,
964 pub pool_idle_timeout: Option<Option<u64>>,
965 pub http1: bool,
966 pub http2: bool,
967 pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
968}
969
970impl Default for CreateHttpClientOptions {
971 fn default() -> Self {
972 CreateHttpClientOptions {
973 root_cert_store: None,
974 ca_certs: vec![],
975 proxy: None,
976 dns_resolver: dns::Resolver::default(),
977 unsafely_ignore_certificate_errors: None,
978 client_cert_chain_and_key: None,
979 pool_max_idle_per_host: None,
980 pool_idle_timeout: None,
981 http1: true,
982 http2: true,
983 client_builder_hook: None,
984 }
985 }
986}
987
988#[derive(Debug, thiserror::Error, deno_error::JsError)]
989#[class(type)]
990pub enum HttpClientCreateError {
991 #[error(transparent)]
992 Tls(deno_tls::TlsError),
993 #[error("Illegal characters in User-Agent: received {0}")]
994 InvalidUserAgent(String),
995 #[error("invalid proxy url")]
996 InvalidProxyUrl,
997 #[error("Cannot create Http Client: either `http1` or `http2` needs to be set to true")]
998 HttpVersionSelectionInvalid,
999 #[class(inherit)]
1000 #[error(transparent)]
1001 RootCertStore(JsErrorBox),
1002}
1003
1004pub fn create_http_client(
1007 user_agent: &str,
1008 options: CreateHttpClientOptions,
1009) -> Result<Client, HttpClientCreateError> {
1010 let mut tls_config = deno_tls::create_client_config(
1011 options.root_cert_store,
1012 options.ca_certs,
1013 options.unsafely_ignore_certificate_errors,
1014 options.client_cert_chain_and_key.into(),
1015 deno_tls::SocketUse::Http,
1016 )
1017 .map_err(HttpClientCreateError::Tls)?;
1018
1019 tls_config.alpn_protocols.clear();
1021 let proxy_tls_config = Arc::from(tls_config.clone());
1022
1023 let mut alpn_protocols = vec![];
1024 if options.http2 {
1025 alpn_protocols.push("h2".into());
1026 }
1027 if options.http1 {
1028 alpn_protocols.push("http/1.1".into());
1029 }
1030 tls_config.alpn_protocols = alpn_protocols;
1031 let tls_config = Arc::from(tls_config);
1032
1033 let mut http_connector =
1034 HttpConnector::new_with_resolver(options.dns_resolver.clone());
1035 http_connector.enforce_http(false);
1036
1037 let user_agent = user_agent.parse::<HeaderValue>().map_err(|_| {
1038 HttpClientCreateError::InvalidUserAgent(user_agent.to_string())
1039 })?;
1040
1041 let mut builder = HyperClientBuilder::new(TokioExecutor::new());
1042 builder.timer(TokioTimer::new());
1043 builder.pool_timer(TokioTimer::new());
1044
1045 if let Some(client_builder_hook) = options.client_builder_hook {
1046 builder = client_builder_hook(builder);
1047 }
1048
1049 let mut proxies = proxy::from_env();
1050 if let Some(proxy) = options.proxy {
1051 let mut intercept = proxy::Intercept::all(&proxy.url)
1052 .ok_or_else(|| HttpClientCreateError::InvalidProxyUrl)?;
1053 if let Some(basic_auth) = &proxy.basic_auth {
1054 intercept.set_auth(&basic_auth.username, &basic_auth.password);
1055 }
1056 proxies.prepend(intercept);
1057 }
1058 let proxies = Arc::new(proxies);
1059 let connector = proxy::ProxyConnector {
1060 http: http_connector,
1061 proxies: proxies.clone(),
1062 tls: tls_config,
1063 tls_proxy: proxy_tls_config,
1064 user_agent: Some(user_agent.clone()),
1065 };
1066
1067 if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
1068 builder.pool_max_idle_per_host(pool_max_idle_per_host);
1069 }
1070
1071 if let Some(pool_idle_timeout) = options.pool_idle_timeout {
1072 builder.pool_idle_timeout(
1073 pool_idle_timeout.map(std::time::Duration::from_millis),
1074 );
1075 }
1076
1077 match (options.http1, options.http2) {
1078 (true, false) => {} (false, true) => {
1080 builder.http2_only(true);
1081 }
1082 (true, true) => {}
1083 (false, false) => {
1084 return Err(HttpClientCreateError::HttpVersionSelectionInvalid)
1085 }
1086 }
1087
1088 let pooled_client = builder.build(connector);
1089 let retry_client = retry::Retry::new(FetchRetry, pooled_client);
1090 let decompress = Decompression::new(retry_client).gzip(true).br(true);
1091
1092 Ok(Client {
1093 inner: decompress,
1094 proxies,
1095 user_agent,
1096 })
1097}
1098
1099#[op2]
1100#[serde]
1101pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
1102 input.into()
1103}
1104
1105#[derive(Clone, Debug)]
1106pub struct Client {
1107 inner: Decompression<
1108 retry::Retry<
1109 FetchRetry,
1110 hyper_util::client::legacy::Client<Connector, ReqBody>,
1111 >,
1112 >,
1113 proxies: Arc<proxy::Proxies>,
1115 user_agent: HeaderValue,
1116}
1117
1118type Connector = proxy::ProxyConnector<HttpConnector<dns::Resolver>>;
1119
1120#[allow(clippy::declare_interior_mutable_const)]
1122const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
1123
1124#[derive(Debug, deno_error::JsError)]
1125#[class(type)]
1126pub struct ClientSendError {
1127 uri: Uri,
1128 pub source: hyper_util::client::legacy::Error,
1129}
1130
1131impl ClientSendError {
1132 pub fn is_connect_error(&self) -> bool {
1133 self.source.is_connect()
1134 }
1135
1136 fn http_info(&self) -> Option<HttpInfo> {
1137 let mut exts = Extensions::new();
1138 self.source.connect_info()?.get_extras(&mut exts);
1139 exts.remove::<HttpInfo>()
1140 }
1141}
1142
1143impl std::fmt::Display for ClientSendError {
1144 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1145 let detail = error_reporter::Report::new(&self.source);
1147
1148 match self.http_info() {
1149 Some(http_info) => {
1150 write!(
1151 f,
1152 "error sending request from {src} for {uri} ({dst}): {detail}",
1153 src = http_info.local_addr(),
1154 uri = self.uri,
1155 dst = http_info.remote_addr(),
1156 detail = detail,
1157 )
1158 }
1159 None => {
1160 write!(
1161 f,
1162 "error sending request for url ({uri}): {detail}",
1163 uri = self.uri,
1164 detail = detail,
1165 )
1166 }
1167 }
1168 }
1169}
1170
1171impl std::error::Error for ClientSendError {
1172 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1173 Some(&self.source)
1174 }
1175}
1176
1177impl Client {
1178 pub async fn send(
1179 self,
1180 mut req: http::Request<ReqBody>,
1181 ) -> Result<http::Response<ResBody>, ClientSendError> {
1182 req
1183 .headers_mut()
1184 .entry(USER_AGENT)
1185 .or_insert_with(|| self.user_agent.clone());
1186
1187 req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1188
1189 if let Some(auth) = self.proxies.http_forward_auth(req.uri()) {
1190 req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
1191 }
1192
1193 let uri = req.uri().clone();
1194
1195 let resp = self
1196 .inner
1197 .oneshot(req)
1198 .await
1199 .map_err(|e| ClientSendError { uri, source: e })?;
1200 Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1201 }
1202}
1203
1204pub enum ReqBody {
1206 Full(http_body_util::Full<Bytes>),
1207 Empty(http_body_util::Empty<Bytes>),
1208 Streaming(BoxBody<Bytes, JsErrorBox>),
1209}
1210
1211pub type ResBody = BoxBody<Bytes, JsErrorBox>;
1212
1213impl ReqBody {
1214 pub fn full(bytes: Bytes) -> Self {
1215 ReqBody::Full(http_body_util::Full::new(bytes))
1216 }
1217
1218 pub fn empty() -> Self {
1219 ReqBody::Empty(http_body_util::Empty::new())
1220 }
1221
1222 pub fn streaming<B>(body: B) -> Self
1223 where
1224 B: hyper::body::Body<Data = Bytes, Error = JsErrorBox>
1225 + Send
1226 + Sync
1227 + 'static,
1228 {
1229 ReqBody::Streaming(BoxBody::new(body))
1230 }
1231}
1232
1233impl hyper::body::Body for ReqBody {
1234 type Data = Bytes;
1235 type Error = JsErrorBox;
1236
1237 fn poll_frame(
1238 mut self: Pin<&mut Self>,
1239 cx: &mut Context<'_>,
1240 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1241 match &mut *self {
1242 ReqBody::Full(ref mut b) => {
1243 Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1244 }
1245 ReqBody::Empty(ref mut b) => {
1246 Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1247 }
1248 ReqBody::Streaming(ref mut b) => Pin::new(b).poll_frame(cx),
1249 }
1250 }
1251
1252 fn is_end_stream(&self) -> bool {
1253 match self {
1254 ReqBody::Full(ref b) => b.is_end_stream(),
1255 ReqBody::Empty(ref b) => b.is_end_stream(),
1256 ReqBody::Streaming(ref b) => b.is_end_stream(),
1257 }
1258 }
1259
1260 fn size_hint(&self) -> hyper::body::SizeHint {
1261 match self {
1262 ReqBody::Full(ref b) => b.size_hint(),
1263 ReqBody::Empty(ref b) => b.size_hint(),
1264 ReqBody::Streaming(ref b) => b.size_hint(),
1265 }
1266 }
1267}
1268
1269pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
1273 use percent_encoding::percent_decode;
1274
1275 if url.has_authority() {
1276 let username: String = percent_decode(url.username().as_bytes())
1277 .decode_utf8()
1278 .ok()?
1279 .into();
1280 let password = url.password().and_then(|pass| {
1281 percent_decode(pass.as_bytes())
1282 .decode_utf8()
1283 .ok()
1284 .map(String::from)
1285 });
1286 if !username.is_empty() || password.is_some() {
1287 url
1288 .set_username("")
1289 .expect("has_authority means set_username shouldn't fail");
1290 url
1291 .set_password(None)
1292 .expect("has_authority means set_password shouldn't fail");
1293 return Some((username, password));
1294 }
1295 }
1296
1297 None
1298}
1299
1300#[op2(fast)]
1301fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
1302 promise.state() != v8::PromiseState::Pending
1303}
1304
1305#[derive(Clone, Debug)]
1307struct FetchRetry;
1308
1309#[derive(Clone, Debug)]
1311struct Retried;
1312
1313impl<ResBody, E>
1314 retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
1315 for FetchRetry
1316where
1317 E: std::error::Error + 'static,
1318{
1319 type Future = future::Ready<()>;
1321
1322 fn retry(
1323 &mut self,
1324 req: &mut http::Request<ReqBody>,
1325 result: &mut Result<http::Response<ResBody>, E>,
1326 ) -> Option<Self::Future> {
1327 if req.extensions().get::<Retried>().is_some() {
1328 return None;
1330 }
1331
1332 match result {
1333 Ok(..) => {
1334 None
1336 }
1337 Err(err) => {
1338 if is_error_retryable(&*err) {
1339 req.extensions_mut().insert(Retried);
1340 Some(future::ready(()))
1341 } else {
1342 None
1343 }
1344 }
1345 }
1346 }
1347
1348 fn clone_request(
1349 &mut self,
1350 req: &http::Request<ReqBody>,
1351 ) -> Option<http::Request<ReqBody>> {
1352 let body = match req.body() {
1353 ReqBody::Full(b) => ReqBody::Full(b.clone()),
1354 ReqBody::Empty(b) => ReqBody::Empty(*b),
1355 ReqBody::Streaming(..) => return None,
1356 };
1357
1358 let mut clone = http::Request::new(body);
1359 *clone.method_mut() = req.method().clone();
1360 *clone.uri_mut() = req.uri().clone();
1361 *clone.headers_mut() = req.headers().clone();
1362 *clone.extensions_mut() = req.extensions().clone();
1363 Some(clone)
1364 }
1365}
1366
1367fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
1368 if let Some(err) = find_source::<h2::Error>(err) {
1370 if err.is_go_away()
1372 && err.is_remote()
1373 && err.reason() == Some(h2::Reason::NO_ERROR)
1374 {
1375 return true;
1376 }
1377
1378 if err.is_reset()
1381 && err.is_remote()
1382 && err.reason() == Some(h2::Reason::REFUSED_STREAM)
1383 {
1384 return true;
1385 }
1386 }
1387
1388 false
1389}
1390
1391fn find_source<'a, E: std::error::Error + 'static>(
1392 err: &'a (dyn std::error::Error + 'static),
1393) -> Option<&'a E> {
1394 let mut err = Some(err);
1395 while let Some(src) = err {
1396 if let Some(found) = src.downcast_ref::<E>() {
1397 return Some(found);
1398 }
1399 err = src.source();
1400 }
1401 None
1402}