1pub mod dns;
4mod fs_fetch_handler;
5mod proxy;
6#[cfg(test)]
7mod tests;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::cmp::min;
12use std::convert::From;
13use std::future;
14use std::future::Future;
15use std::net::IpAddr;
16use std::path::Path;
17#[cfg(not(windows))]
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24
25use bytes::Bytes;
26pub use data_url;
28use data_url::DataUrl;
29use deno_core::AsyncRefCell;
30use deno_core::AsyncResult;
31use deno_core::BufView;
32use deno_core::ByteString;
33use deno_core::CancelFuture;
34use deno_core::CancelHandle;
35use deno_core::CancelTryFuture;
36use deno_core::Canceled;
37use deno_core::JsBuffer;
38use deno_core::OpState;
39use deno_core::RcRef;
40use deno_core::Resource;
41use deno_core::ResourceId;
42use deno_core::futures::FutureExt;
43use deno_core::futures::Stream;
44use deno_core::futures::StreamExt;
45use deno_core::futures::TryFutureExt;
46use deno_core::futures::stream::Peekable;
47use deno_core::op2;
48use deno_core::url;
49use deno_core::url::Url;
50use deno_core::v8;
51use deno_error::JsErrorBox;
52pub use deno_fs::FsError;
53use deno_path_util::PathToUrlError;
54use deno_permissions::CheckedPath;
55use deno_permissions::OpenAccessKind;
56use deno_permissions::PermissionCheckError;
57use deno_tls::Proxy;
58use deno_tls::RootCertStoreProvider;
59use deno_tls::SocketUse;
60use deno_tls::TlsKey;
61use deno_tls::TlsKeys;
62use deno_tls::TlsKeysHolder;
63use deno_tls::rustls::RootCertStore;
64pub use fs_fetch_handler::FsFetchHandler;
65use http::Extensions;
66use http::HeaderMap;
67use http::Method;
68use http::Uri;
69use http::header::ACCEPT;
70use http::header::ACCEPT_ENCODING;
71use http::header::AUTHORIZATION;
72use http::header::CONTENT_LENGTH;
73use http::header::HOST;
74use http::header::HeaderName;
75use http::header::HeaderValue;
76use http::header::PROXY_AUTHORIZATION;
77use http::header::RANGE;
78use http::header::USER_AGENT;
79use http_body_util::BodyExt;
80use http_body_util::combinators::BoxBody;
81use hyper::body::Frame;
82use hyper_util::client::legacy::Builder as HyperClientBuilder;
83use hyper_util::client::legacy::connect::Connection;
84use hyper_util::client::legacy::connect::HttpConnector;
85use hyper_util::client::legacy::connect::HttpInfo;
86use hyper_util::rt::TokioExecutor;
87use hyper_util::rt::TokioIo;
88use hyper_util::rt::TokioTimer;
89pub use proxy::basic_auth;
90use serde::Deserialize;
91use serde::Serialize;
92use tower::BoxError;
93use tower::Service;
94use tower::ServiceExt;
95use tower::retry;
96use tower_http::decompression::Decompression;
97
98#[derive(Clone)]
99pub struct Options {
100 pub user_agent: String,
101 pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
102 pub proxy: Option<Proxy>,
103 pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
113 #[allow(clippy::type_complexity)]
114 pub request_builder_hook:
115 Option<fn(&mut http::Request<ReqBody>) -> Result<(), JsErrorBox>>,
116 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
117 pub client_cert_chain_and_key: TlsKeys,
118 pub file_fetch_handler: Rc<dyn FetchHandler>,
119 pub resolver: dns::Resolver,
120}
121
122impl Options {
123 pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, JsErrorBox> {
124 Ok(match &self.root_cert_store_provider {
125 Some(provider) => Some(provider.get_or_try_init()?.clone()),
126 None => None,
127 })
128 }
129}
130
131impl Default for Options {
132 fn default() -> Self {
133 Self {
134 user_agent: "".to_string(),
135 root_cert_store_provider: None,
136 proxy: None,
137 client_builder_hook: None,
138 request_builder_hook: None,
139 unsafely_ignore_certificate_errors: None,
140 client_cert_chain_and_key: TlsKeys::Null,
141 file_fetch_handler: Rc::new(DefaultFileFetchHandler),
142 resolver: dns::Resolver::default(),
143 }
144 }
145}
146
147deno_core::extension!(deno_fetch,
148 deps = [ deno_webidl, deno_web, deno_url, deno_console ],
149 parameters = [FP: FetchPermissions],
150 ops = [
151 op_fetch<FP>,
152 op_fetch_send,
153 op_utf8_to_byte_string,
154 op_fetch_custom_client<FP>,
155 op_fetch_promise_is_settled,
156 ],
157 esm = [
158 "20_headers.js",
159 "21_formdata.js",
160 "22_body.js",
161 "22_http_client.js",
162 "23_request.js",
163 "23_response.js",
164 "26_fetch.js",
165 "27_eventsource.js"
166 ],
167 options = {
168 options: Options,
169 },
170 state = |state, options| {
171 state.put::<Options>(options.options);
172 },
173);
174
175#[derive(Debug, thiserror::Error, deno_error::JsError)]
176pub enum FetchError {
177 #[class(inherit)]
178 #[error(transparent)]
179 Resource(#[from] deno_core::error::ResourceError),
180 #[class(inherit)]
181 #[error(transparent)]
182 Permission(#[from] PermissionCheckError),
183 #[class(type)]
184 #[error("NetworkError when attempting to fetch resource")]
185 NetworkError,
186 #[class(type)]
187 #[error("Fetching files only supports the GET method: received {0}")]
188 FsNotGet(Method),
189 #[class(inherit)]
190 #[error(transparent)]
191 PathToUrl(#[from] PathToUrlError),
192 #[class(type)]
193 #[error("Invalid URL {0}")]
194 InvalidUrl(Url),
195 #[class(type)]
196 #[error(transparent)]
197 InvalidHeaderName(#[from] http::header::InvalidHeaderName),
198 #[class(type)]
199 #[error(transparent)]
200 InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
201 #[class(type)]
202 #[error("{0:?}")]
203 DataUrl(data_url::DataUrlError),
204 #[class(type)]
205 #[error("{0:?}")]
206 Base64(data_url::forgiving_base64::InvalidBase64),
207 #[class(type)]
208 #[error("Blob for the given URL not found.")]
209 BlobNotFound,
210 #[class(type)]
211 #[error("Url scheme '{0}' not supported")]
212 SchemeNotSupported(String),
213 #[class(type)]
214 #[error("Request was cancelled")]
215 RequestCanceled,
216 #[class(generic)]
217 #[error(transparent)]
218 Http(#[from] http::Error),
219 #[class(inherit)]
220 #[error(transparent)]
221 ClientCreate(#[from] HttpClientCreateError),
222 #[class(inherit)]
223 #[error(transparent)]
224 Url(#[from] url::ParseError),
225 #[class(type)]
226 #[error(transparent)]
227 Method(#[from] http::method::InvalidMethod),
228 #[class(inherit)]
229 #[error(transparent)]
230 ClientSend(#[from] ClientSendError),
231 #[class(inherit)]
232 #[error(transparent)]
233 RequestBuilderHook(JsErrorBox),
234 #[class(inherit)]
235 #[error(transparent)]
236 Io(#[from] std::io::Error),
237 #[class(generic)]
238 #[error(transparent)]
239 Dns(hickory_resolver::ResolveError),
240 #[class(generic)]
241 #[error(transparent)]
242 PermissionCheck(PermissionCheckError),
243}
244
245impl From<deno_fs::FsError> for FetchError {
246 fn from(value: deno_fs::FsError) -> Self {
247 match value {
248 deno_fs::FsError::Io(_)
249 | deno_fs::FsError::FileBusy
250 | deno_fs::FsError::NotSupported => FetchError::NetworkError,
251 deno_fs::FsError::PermissionCheck(err) => {
252 FetchError::PermissionCheck(err)
253 }
254 }
255 }
256}
257
258pub type CancelableResponseFuture =
259 Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
260
261pub trait FetchHandler: dyn_clone::DynClone {
262 fn fetch_file(
266 &self,
267 state: &mut OpState,
268 url: &Url,
269 ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
270}
271
272dyn_clone::clone_trait_object!(FetchHandler);
273
274#[derive(Clone)]
276pub struct DefaultFileFetchHandler;
277
278impl FetchHandler for DefaultFileFetchHandler {
279 fn fetch_file(
280 &self,
281 _state: &mut OpState,
282 _url: &Url,
283 ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
284 let fut = async move { Ok(Err(FetchError::NetworkError)) };
285 (Box::pin(fut), None)
286 }
287}
288
289#[derive(Serialize)]
290#[serde(rename_all = "camelCase")]
291pub struct FetchReturn {
292 pub request_rid: ResourceId,
293 pub cancel_handle_rid: Option<ResourceId>,
294}
295
296pub fn get_or_create_client_from_state(
297 state: &mut OpState,
298) -> Result<Client, HttpClientCreateError> {
299 if let Some(client) = state.try_borrow::<Client>() {
300 Ok(client.clone())
301 } else {
302 let options = state.borrow::<Options>();
303 let client = create_client_from_options(options)?;
304 state.put::<Client>(client.clone());
305 Ok(client)
306 }
307}
308
309pub fn create_client_from_options(
310 options: &Options,
311) -> Result<Client, HttpClientCreateError> {
312 create_http_client(
313 &options.user_agent,
314 CreateHttpClientOptions {
315 root_cert_store: options
316 .root_cert_store()
317 .map_err(HttpClientCreateError::RootCertStore)?,
318 ca_certs: vec![],
319 proxy: options.proxy.clone(),
320 dns_resolver: options.resolver.clone(),
321 unsafely_ignore_certificate_errors: options
322 .unsafely_ignore_certificate_errors
323 .clone(),
324 client_cert_chain_and_key: options
325 .client_cert_chain_and_key
326 .clone()
327 .try_into()
328 .unwrap_or_default(),
329 pool_max_idle_per_host: None,
330 pool_idle_timeout: None,
331 http1: true,
332 http2: true,
333 local_address: None,
334 client_builder_hook: options.client_builder_hook,
335 },
336 )
337}
338
339#[allow(clippy::type_complexity)]
340pub struct ResourceToBodyAdapter(
341 Rc<dyn Resource>,
342 Option<Pin<Box<dyn Future<Output = Result<BufView, JsErrorBox>>>>>,
343);
344
345impl ResourceToBodyAdapter {
346 pub fn new(resource: Rc<dyn Resource>) -> Self {
347 let future = resource.clone().read(64 * 1024);
348 Self(resource, Some(future))
349 }
350}
351
352unsafe impl Send for ResourceToBodyAdapter {}
354unsafe impl Sync for ResourceToBodyAdapter {}
356
357impl Stream for ResourceToBodyAdapter {
358 type Item = Result<Bytes, JsErrorBox>;
359
360 fn poll_next(
361 self: Pin<&mut Self>,
362 cx: &mut Context<'_>,
363 ) -> Poll<Option<Self::Item>> {
364 let this = self.get_mut();
365 match this.1.take() {
366 Some(mut fut) => match fut.poll_unpin(cx) {
367 Poll::Pending => {
368 this.1 = Some(fut);
369 Poll::Pending
370 }
371 Poll::Ready(res) => match res {
372 Ok(buf) if buf.is_empty() => Poll::Ready(None),
373 Ok(buf) => {
374 this.1 = Some(this.0.clone().read(64 * 1024));
375 Poll::Ready(Some(Ok(buf.to_vec().into())))
376 }
377 Err(err) => Poll::Ready(Some(Err(err))),
378 },
379 },
380 _ => Poll::Ready(None),
381 }
382 }
383}
384
385impl hyper::body::Body for ResourceToBodyAdapter {
386 type Data = Bytes;
387 type Error = JsErrorBox;
388
389 fn poll_frame(
390 self: Pin<&mut Self>,
391 cx: &mut Context<'_>,
392 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
393 match self.poll_next(cx) {
394 Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
395 Poll::Ready(None) => Poll::Ready(None),
396 Poll::Pending => Poll::Pending,
397 }
398 }
399}
400
401impl Drop for ResourceToBodyAdapter {
402 fn drop(&mut self) {
403 self.0.clone().close()
404 }
405}
406
407pub trait FetchPermissions {
408 fn check_net(
409 &mut self,
410 host: &str,
411 port: u16,
412 api_name: &str,
413 ) -> Result<(), PermissionCheckError>;
414 fn check_net_url(
415 &mut self,
416 url: &Url,
417 api_name: &str,
418 ) -> Result<(), PermissionCheckError>;
419 #[must_use = "the resolved return value to mitigate time-of-check to time-of-use issues"]
420 fn check_open<'a>(
421 &mut self,
422 path: Cow<'a, Path>,
423 open_access: OpenAccessKind,
424 api_name: &str,
425 ) -> Result<CheckedPath<'a>, PermissionCheckError>;
426 fn check_net_vsock(
427 &mut self,
428 cid: u32,
429 port: u32,
430 api_name: &str,
431 ) -> Result<(), PermissionCheckError>;
432}
433
434impl FetchPermissions for deno_permissions::PermissionsContainer {
435 #[inline(always)]
436 fn check_net(
437 &mut self,
438 host: &str,
439 port: u16,
440 api_name: &str,
441 ) -> Result<(), PermissionCheckError> {
442 deno_permissions::PermissionsContainer::check_net(
443 self,
444 &(host, Some(port)),
445 api_name,
446 )
447 }
448
449 #[inline(always)]
450 fn check_net_url(
451 &mut self,
452 url: &Url,
453 api_name: &str,
454 ) -> Result<(), PermissionCheckError> {
455 deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
456 }
457
458 #[inline(always)]
459 fn check_open<'a>(
460 &mut self,
461 path: Cow<'a, Path>,
462 open_access: OpenAccessKind,
463 api_name: &str,
464 ) -> Result<CheckedPath<'a>, PermissionCheckError> {
465 deno_permissions::PermissionsContainer::check_open(
466 self,
467 path,
468 open_access,
469 Some(api_name),
470 )
471 }
472
473 #[inline(always)]
474 fn check_net_vsock(
475 &mut self,
476 cid: u32,
477 port: u32,
478 api_name: &str,
479 ) -> Result<(), PermissionCheckError> {
480 deno_permissions::PermissionsContainer::check_net_vsock(
481 self, cid, port, api_name,
482 )
483 }
484}
485
486#[op2(stack_trace)]
487#[serde]
488#[allow(clippy::too_many_arguments)]
489#[allow(clippy::large_enum_variant)]
490#[allow(clippy::result_large_err)]
491pub fn op_fetch<FP>(
492 state: &mut OpState,
493 #[serde] method: ByteString,
494 #[string] url: String,
495 #[serde] headers: Vec<(ByteString, ByteString)>,
496 #[smi] client_rid: Option<u32>,
497 has_body: bool,
498 #[buffer] data: Option<JsBuffer>,
499 #[smi] resource: Option<ResourceId>,
500) -> Result<FetchReturn, FetchError>
501where
502 FP: FetchPermissions + 'static,
503{
504 let (client, allow_host) = if let Some(rid) = client_rid {
505 let r = state.resource_table.get::<HttpClientResource>(rid)?;
506 (r.client.clone(), r.allow_host)
507 } else {
508 (get_or_create_client_from_state(state)?, false)
509 };
510
511 let method = Method::from_bytes(&method)?;
512 let mut url = Url::parse(&url)?;
513
514 let scheme = url.scheme();
516 let (request_rid, cancel_handle_rid) = match scheme {
517 "file" => {
518 if method != Method::GET {
519 return Err(FetchError::FsNotGet(method));
520 }
521 let Options {
522 file_fetch_handler, ..
523 } = state.borrow_mut::<Options>();
524 let file_fetch_handler = file_fetch_handler.clone();
525 let (future, maybe_cancel_handle) =
526 file_fetch_handler.fetch_file(state, &url);
527 let request_rid = state
528 .resource_table
529 .add(FetchRequestResource { future, url });
530 let maybe_cancel_handle_rid = maybe_cancel_handle
531 .map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
532
533 (request_rid, maybe_cancel_handle_rid)
534 }
535 "http" | "https" => {
536 let permissions = state.borrow_mut::<FP>();
537 permissions.check_net_url(&url, "fetch()")?;
538
539 let maybe_authority = extract_authority(&mut url);
540 let uri = url
541 .as_str()
542 .parse::<Uri>()
543 .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
544
545 let mut con_len = None;
546 let body = if has_body {
547 match (data, resource) {
548 (Some(data), _) => {
549 con_len = Some(data.len() as u64);
551
552 ReqBody::full(data.to_vec().into())
553 }
554 (_, Some(resource)) => {
555 let resource = state.resource_table.take_any(resource)?;
556 match resource.size_hint() {
557 (body_size, Some(n)) if body_size == n && body_size > 0 => {
558 con_len = Some(body_size);
559 }
560 _ => {}
561 }
562 ReqBody::streaming(ResourceToBodyAdapter::new(resource))
563 }
564 (None, None) => unreachable!(),
565 }
566 } else {
567 if matches!(method, Method::POST | Method::PUT) {
570 con_len = Some(0);
571 }
572 ReqBody::empty()
573 };
574
575 let mut request = http::Request::new(body);
576 *request.method_mut() = method.clone();
577 *request.uri_mut() = uri.clone();
578
579 if let Some((username, password)) = maybe_authority {
580 request.headers_mut().insert(
581 AUTHORIZATION,
582 proxy::basic_auth(&username, password.as_deref()),
583 );
584 }
585 if let Some(len) = con_len {
586 request.headers_mut().insert(CONTENT_LENGTH, len.into());
587 }
588
589 for (key, value) in headers {
590 let name = HeaderName::from_bytes(&key)?;
591 let v = HeaderValue::from_bytes(&value)?;
592
593 if (name != HOST || allow_host) && name != CONTENT_LENGTH {
594 request.headers_mut().append(name, v);
595 }
596 }
597
598 if request.headers().contains_key(RANGE) {
599 request
602 .headers_mut()
603 .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
604 }
605
606 let options = state.borrow::<Options>();
607 if let Some(request_builder_hook) = options.request_builder_hook {
608 request_builder_hook(&mut request)
609 .map_err(FetchError::RequestBuilderHook)?;
610 }
611
612 let cancel_handle = CancelHandle::new_rc();
613 let cancel_handle_ = cancel_handle.clone();
614
615 let fut = async move {
616 client
617 .send(request)
618 .map_err(Into::into)
619 .or_cancel(cancel_handle_)
620 .await
621 };
622
623 let request_rid = state.resource_table.add(FetchRequestResource {
624 future: Box::pin(fut),
625 url,
626 });
627
628 let cancel_handle_rid =
629 state.resource_table.add(FetchCancelHandle(cancel_handle));
630
631 (request_rid, Some(cancel_handle_rid))
632 }
633 "data" => {
634 let data_url =
635 DataUrl::process(url.as_str()).map_err(FetchError::DataUrl)?;
636
637 let (body, _) = data_url.decode_to_vec().map_err(FetchError::Base64)?;
638 let body = http_body_util::Full::new(body.into())
639 .map_err(|never| match never {})
640 .boxed();
641
642 let response = http::Response::builder()
643 .status(http::StatusCode::OK)
644 .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
645 .body(body)?;
646
647 let fut = async move { Ok(Ok(response)) };
648
649 let request_rid = state.resource_table.add(FetchRequestResource {
650 future: Box::pin(fut),
651 url,
652 });
653
654 (request_rid, None)
655 }
656 "blob" => {
657 return Err(FetchError::BlobNotFound);
660 }
661 _ => return Err(FetchError::SchemeNotSupported(scheme.to_string())),
662 };
663
664 Ok(FetchReturn {
665 request_rid,
666 cancel_handle_rid,
667 })
668}
669
670#[derive(Default, Serialize)]
671#[serde(rename_all = "camelCase")]
672pub struct FetchResponse {
673 pub status: u16,
674 pub status_text: String,
675 pub headers: Vec<(ByteString, ByteString)>,
676 pub url: String,
677 pub response_rid: ResourceId,
678 pub content_length: Option<u64>,
679 pub error: Option<(String, String)>,
684}
685
686#[op2(async)]
687#[serde]
688pub async fn op_fetch_send(
689 state: Rc<RefCell<OpState>>,
690 #[smi] rid: ResourceId,
691) -> Result<FetchResponse, FetchError> {
692 let request = state
693 .borrow_mut()
694 .resource_table
695 .take::<FetchRequestResource>(rid)?;
696
697 let request = Rc::try_unwrap(request)
698 .ok()
699 .expect("multiple op_fetch_send ongoing");
700
701 let res = match request.future.await {
702 Ok(Ok(res)) => res,
703 Ok(Err(err)) => {
704 if let FetchError::ClientSend(err_src) = &err
710 && let Some(client_err) = std::error::Error::source(&err_src.source)
711 && let Some(err_src) = client_err.downcast_ref::<hyper::Error>()
712 && let Some(err_src) = std::error::Error::source(err_src)
713 {
714 return Ok(FetchResponse {
715 error: Some((err.to_string(), err_src.to_string())),
716 ..Default::default()
717 });
718 }
719
720 return Err(err);
721 }
722 Err(_) => return Err(FetchError::RequestCanceled),
723 };
724
725 let status = res.status();
726 let url = request.url.into();
727 let mut res_headers = Vec::new();
728 for (key, val) in res.headers().iter() {
729 res_headers.push((key.as_str().into(), val.as_bytes().into()));
730 }
731
732 let content_length = hyper::body::Body::size_hint(res.body()).exact();
733
734 let response_rid = state
735 .borrow_mut()
736 .resource_table
737 .add(FetchResponseResource::new(res, content_length));
738
739 Ok(FetchResponse {
740 status: status.as_u16(),
741 status_text: status.canonical_reason().unwrap_or("").to_string(),
742 headers: res_headers,
743 url,
744 response_rid,
745 content_length,
746 error: None,
747 })
748}
749
750type CancelableResponseResult =
751 Result<Result<http::Response<ResBody>, FetchError>, Canceled>;
752
753pub struct FetchRequestResource {
754 pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
755 pub url: Url,
756}
757
758impl Resource for FetchRequestResource {
759 fn name(&self) -> Cow<'_, str> {
760 "fetchRequest".into()
761 }
762}
763
764pub struct FetchCancelHandle(pub Rc<CancelHandle>);
765
766impl Resource for FetchCancelHandle {
767 fn name(&self) -> Cow<'_, str> {
768 "fetchCancelHandle".into()
769 }
770
771 fn close(self: Rc<Self>) {
772 self.0.cancel()
773 }
774}
775
776type BytesStream =
777 Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
778
779pub enum FetchResponseReader {
780 Start(http::Response<ResBody>),
781 BodyReader(Peekable<BytesStream>),
782}
783
784impl Default for FetchResponseReader {
785 fn default() -> Self {
786 let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
787 Self::BodyReader(stream.peekable())
788 }
789}
790#[derive(Debug)]
791pub struct FetchResponseResource {
792 pub response_reader: AsyncRefCell<FetchResponseReader>,
793 pub cancel: CancelHandle,
794 pub size: Option<u64>,
795}
796
797impl FetchResponseResource {
798 pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
799 Self {
800 response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
801 cancel: CancelHandle::default(),
802 size,
803 }
804 }
805
806 pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
807 let reader = self.response_reader.into_inner();
808 match reader {
809 FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
810 _ => unreachable!(),
811 }
812 }
813}
814
815impl Resource for FetchResponseResource {
816 fn name(&self) -> Cow<'_, str> {
817 "fetchResponse".into()
818 }
819
820 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
821 Box::pin(async move {
822 let mut reader =
823 RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
824
825 let body = loop {
826 match &mut *reader {
827 FetchResponseReader::BodyReader(reader) => break reader,
828 FetchResponseReader::Start(_) => {}
829 }
830
831 match std::mem::take(&mut *reader) {
832 FetchResponseReader::Start(resp) => {
833 let stream: BytesStream = Box::pin(
834 resp
835 .into_body()
836 .into_data_stream()
837 .map(|r| r.map_err(std::io::Error::other)),
838 );
839 *reader = FetchResponseReader::BodyReader(stream.peekable());
840 }
841 FetchResponseReader::BodyReader(_) => unreachable!(),
842 }
843 };
844 let fut = async move {
845 let mut reader = Pin::new(body);
846 loop {
847 match reader.as_mut().peek_mut().await {
848 Some(Ok(chunk)) if !chunk.is_empty() => {
849 let len = min(limit, chunk.len());
850 let chunk = chunk.split_to(len);
851 break Ok(chunk.into());
852 }
853 Some(_) => match reader.as_mut().next().await.unwrap() {
860 Ok(chunk) => assert!(chunk.is_empty()),
861 Err(err) => break Err(JsErrorBox::type_error(err.to_string())),
862 },
863 None => break Ok(BufView::empty()),
864 }
865 }
866 };
867
868 let cancel_handle = RcRef::map(self, |r| &r.cancel);
869 fut
870 .try_or_cancel(cancel_handle)
871 .await
872 .map_err(JsErrorBox::from_err)
873 })
874 }
875
876 fn size_hint(&self) -> (u64, Option<u64>) {
877 (self.size.unwrap_or(0), self.size)
878 }
879
880 fn close(self: Rc<Self>) {
881 self.cancel.cancel()
882 }
883}
884
885pub struct HttpClientResource {
886 pub client: Client,
887 pub allow_host: bool,
888}
889
890impl Resource for HttpClientResource {
891 fn name(&self) -> Cow<'_, str> {
892 "httpClient".into()
893 }
894}
895
896impl HttpClientResource {
897 fn new(client: Client, allow_host: bool) -> Self {
898 Self { client, allow_host }
899 }
900}
901
902#[derive(Deserialize, Debug)]
903#[serde(rename_all = "camelCase")]
904pub struct CreateHttpClientArgs {
905 ca_certs: Vec<String>,
906 proxy: Option<Proxy>,
907 pool_max_idle_per_host: Option<usize>,
908 pool_idle_timeout: Option<serde_json::Value>,
909 #[serde(default = "default_true")]
910 http1: bool,
911 #[serde(default = "default_true")]
912 http2: bool,
913 #[serde(default)]
914 allow_host: bool,
915 local_address: Option<String>,
916}
917
918fn default_true() -> bool {
919 true
920}
921
922#[op2(stack_trace)]
923#[smi]
924#[allow(clippy::result_large_err)]
925pub fn op_fetch_custom_client<FP>(
926 state: &mut OpState,
927 #[serde] mut args: CreateHttpClientArgs,
928 #[cppgc] tls_keys: &TlsKeysHolder,
929) -> Result<ResourceId, FetchError>
930where
931 FP: FetchPermissions + 'static,
932{
933 if let Some(proxy) = &mut args.proxy {
934 let permissions = state.borrow_mut::<FP>();
935 match proxy {
936 Proxy::Http { url, .. } => {
937 let url = Url::parse(url)?;
938 permissions.check_net_url(&url, "Deno.createHttpClient()")?;
939 }
940 Proxy::Tcp { hostname, port } => {
941 permissions.check_net(hostname, *port, "Deno.createHttpClient()")?;
942 }
943 Proxy::Unix {
944 path: original_path,
945 } => {
946 let path = Path::new(original_path);
947 let resolved_path = permissions
948 .check_open(
949 Cow::Borrowed(path),
950 OpenAccessKind::ReadWriteNoFollow,
951 "Deno.createHttpClient()",
952 )?
953 .into_path();
954 if path != resolved_path {
955 *original_path = resolved_path.to_string_lossy().into_owned();
956 }
957 }
958 Proxy::Vsock { cid, port } => {
959 let permissions = state.borrow_mut::<FP>();
960 permissions.check_net_vsock(*cid, *port, "Deno.createHttpClient()")?;
961 }
962 }
963 }
964
965 let options = state.borrow::<Options>();
966 let ca_certs = args
967 .ca_certs
968 .into_iter()
969 .map(|cert| cert.into_bytes())
970 .collect::<Vec<_>>();
971
972 let client = create_http_client(
973 &options.user_agent,
974 CreateHttpClientOptions {
975 root_cert_store: options
976 .root_cert_store()
977 .map_err(HttpClientCreateError::RootCertStore)?,
978 ca_certs,
979 proxy: args.proxy,
980 dns_resolver: dns::Resolver::default(),
981 unsafely_ignore_certificate_errors: options
982 .unsafely_ignore_certificate_errors
983 .clone(),
984 client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
985 pool_max_idle_per_host: args.pool_max_idle_per_host,
986 pool_idle_timeout: args.pool_idle_timeout.and_then(
987 |timeout| match timeout {
988 serde_json::Value::Bool(true) => None,
989 serde_json::Value::Bool(false) => Some(None),
990 serde_json::Value::Number(specify) => {
991 Some(Some(specify.as_u64().unwrap_or_default()))
992 }
993 _ => Some(None),
994 },
995 ),
996 http1: args.http1,
997 http2: args.http2,
998 local_address: args.local_address,
999 client_builder_hook: options.client_builder_hook,
1000 },
1001 )?;
1002
1003 let rid = state
1004 .resource_table
1005 .add(HttpClientResource::new(client, args.allow_host));
1006 Ok(rid)
1007}
1008
1009#[derive(Debug, Clone)]
1010pub struct CreateHttpClientOptions {
1011 pub root_cert_store: Option<RootCertStore>,
1012 pub ca_certs: Vec<Vec<u8>>,
1013 pub proxy: Option<Proxy>,
1014 pub dns_resolver: dns::Resolver,
1015 pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
1016 pub client_cert_chain_and_key: Option<TlsKey>,
1017 pub pool_max_idle_per_host: Option<usize>,
1018 pub pool_idle_timeout: Option<Option<u64>>,
1019 pub http1: bool,
1020 pub http2: bool,
1021 pub local_address: Option<String>,
1022 pub client_builder_hook: Option<fn(HyperClientBuilder) -> HyperClientBuilder>,
1023}
1024
1025impl Default for CreateHttpClientOptions {
1026 fn default() -> Self {
1027 CreateHttpClientOptions {
1028 root_cert_store: None,
1029 ca_certs: vec![],
1030 proxy: None,
1031 dns_resolver: dns::Resolver::default(),
1032 unsafely_ignore_certificate_errors: None,
1033 client_cert_chain_and_key: None,
1034 pool_max_idle_per_host: None,
1035 pool_idle_timeout: None,
1036 http1: true,
1037 http2: true,
1038 local_address: None,
1039 client_builder_hook: None,
1040 }
1041 }
1042}
1043
1044#[derive(Debug, thiserror::Error, deno_error::JsError)]
1045#[class(type)]
1046pub enum HttpClientCreateError {
1047 #[error(transparent)]
1048 Tls(deno_tls::TlsError),
1049 #[error("Illegal characters in User-Agent: received {0}")]
1050 InvalidUserAgent(String),
1051 #[error("Invalid address: {0}")]
1052 InvalidAddress(String),
1053 #[error("invalid proxy url")]
1054 InvalidProxyUrl,
1055 #[error(
1056 "Cannot create Http Client: either `http1` or `http2` needs to be set to true"
1057 )]
1058 HttpVersionSelectionInvalid,
1059 #[class(inherit)]
1060 #[error(transparent)]
1061 RootCertStore(JsErrorBox),
1062 #[error("Unix proxy is not supported on Windows")]
1063 UnixProxyNotSupportedOnWindows,
1064 #[error("Vsock proxy is not supported on this platform")]
1065 VsockProxyNotSupported,
1066}
1067
1068pub fn create_http_client(
1071 user_agent: &str,
1072 options: CreateHttpClientOptions,
1073) -> Result<Client, HttpClientCreateError> {
1074 let mut tls_config =
1075 deno_tls::create_client_config(deno_tls::TlsClientConfigOptions {
1076 root_cert_store: options.root_cert_store,
1077 ca_certs: options.ca_certs,
1078 unsafely_ignore_certificate_errors: options
1079 .unsafely_ignore_certificate_errors,
1080 unsafely_disable_hostname_verification: false,
1081 cert_chain_and_key: options.client_cert_chain_and_key.into(),
1082 socket_use: deno_tls::SocketUse::Http,
1083 })
1084 .map_err(HttpClientCreateError::Tls)?;
1085
1086 tls_config.alpn_protocols.clear();
1088 let proxy_tls_config = Arc::from(tls_config.clone());
1089
1090 let mut alpn_protocols = vec![];
1091 if options.http2 {
1092 alpn_protocols.push("h2".into());
1093 }
1094 if options.http1 {
1095 alpn_protocols.push("http/1.1".into());
1096 }
1097 tls_config.alpn_protocols = alpn_protocols;
1098 let tls_config = Arc::from(tls_config);
1099
1100 let mut http_connector =
1101 HttpConnector::new_with_resolver(options.dns_resolver.clone());
1102 http_connector.enforce_http(false);
1103 if let Some(local_address) = options.local_address {
1104 let local_addr = local_address
1105 .parse::<IpAddr>()
1106 .map_err(|_| HttpClientCreateError::InvalidAddress(local_address))?;
1107 http_connector.set_local_address(Some(local_addr));
1108 }
1109
1110 let user_agent = user_agent.parse::<HeaderValue>().map_err(|_| {
1111 HttpClientCreateError::InvalidUserAgent(user_agent.to_string())
1112 })?;
1113
1114 let mut builder = HyperClientBuilder::new(TokioExecutor::new());
1115 builder.timer(TokioTimer::new());
1116 builder.pool_timer(TokioTimer::new());
1117
1118 if let Some(client_builder_hook) = options.client_builder_hook {
1119 builder = client_builder_hook(builder);
1120 }
1121
1122 let mut proxies = proxy::from_env();
1123 if let Some(proxy) = options.proxy {
1124 let intercept = match proxy {
1125 Proxy::Http { url, basic_auth } => {
1126 let target = proxy::Target::parse(&url)
1127 .ok_or_else(|| HttpClientCreateError::InvalidProxyUrl)?;
1128 let mut intercept = proxy::Intercept::all(target);
1129 if let Some(basic_auth) = &basic_auth {
1130 intercept.set_auth(&basic_auth.username, &basic_auth.password);
1131 }
1132 intercept
1133 }
1134 Proxy::Tcp {
1135 hostname: host,
1136 port,
1137 } => {
1138 let target = proxy::Target::new_tcp(host, port);
1139 proxy::Intercept::all(target)
1140 }
1141 #[cfg(not(windows))]
1142 Proxy::Unix { path } => {
1143 let target = proxy::Target::new_unix(PathBuf::from(path));
1144 proxy::Intercept::all(target)
1145 }
1146 #[cfg(windows)]
1147 Proxy::Unix { .. } => {
1148 return Err(HttpClientCreateError::UnixProxyNotSupportedOnWindows);
1149 }
1150 #[cfg(any(
1151 target_os = "android",
1152 target_os = "linux",
1153 target_os = "macos"
1154 ))]
1155 Proxy::Vsock { cid, port } => {
1156 let target = proxy::Target::new_vsock(cid, port);
1157 proxy::Intercept::all(target)
1158 }
1159 #[cfg(not(any(
1160 target_os = "android",
1161 target_os = "linux",
1162 target_os = "macos"
1163 )))]
1164 Proxy::Vsock { .. } => {
1165 return Err(HttpClientCreateError::VsockProxyNotSupported);
1166 }
1167 };
1168 proxies.prepend(intercept);
1169 }
1170 let proxies = Arc::new(proxies);
1171 let connector = proxy::ProxyConnector {
1172 http: http_connector,
1173 proxies,
1174 tls: tls_config,
1175 tls_proxy: proxy_tls_config,
1176 user_agent: Some(user_agent.clone()),
1177 };
1178
1179 if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
1180 builder.pool_max_idle_per_host(pool_max_idle_per_host);
1181 }
1182
1183 if let Some(pool_idle_timeout) = options.pool_idle_timeout {
1184 builder.pool_idle_timeout(
1185 pool_idle_timeout.map(std::time::Duration::from_millis),
1186 );
1187 }
1188
1189 match (options.http1, options.http2) {
1190 (true, false) => {} (false, true) => {
1192 builder.http2_only(true);
1193 }
1194 (true, true) => {}
1195 (false, false) => {
1196 return Err(HttpClientCreateError::HttpVersionSelectionInvalid);
1197 }
1198 }
1199
1200 let pooled_client = builder.build(connector.clone());
1201 let retry_client = retry::Retry::new(FetchRetry, pooled_client);
1202 let decompress = Decompression::new(retry_client).gzip(true).br(true);
1203
1204 Ok(Client {
1205 inner: decompress,
1206 connector,
1207 user_agent,
1208 })
1209}
1210
1211#[op2]
1212#[serde]
1213pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
1214 input.into()
1215}
1216
1217#[derive(Clone, Debug)]
1218pub struct Client {
1219 inner: Decompression<
1220 retry::Retry<
1221 FetchRetry,
1222 hyper_util::client::legacy::Client<Connector, ReqBody>,
1223 >,
1224 >,
1225 connector: Connector,
1226 user_agent: HeaderValue,
1227}
1228
1229#[derive(Debug, thiserror::Error, deno_error::JsError)]
1230pub enum ClientConnectError {
1231 #[class(type)]
1232 #[error("HTTP/1.1 not supported by this client")]
1233 Http1NotSupported,
1234 #[class(type)]
1235 #[error("HTTP/2 not supported by this client")]
1236 Http2NotSupported,
1237 #[class(generic)]
1238 #[error(transparent)]
1239 Connector(BoxError),
1240}
1241
1242impl Client {
1243 pub async fn connect(
1244 &self,
1245 uri: Uri,
1246 socket_use: SocketUse,
1247 ) -> Result<
1248 impl tokio::io::AsyncRead
1249 + tokio::io::AsyncWrite
1250 + Connection
1251 + Unpin
1252 + Send
1253 + 'static,
1254 ClientConnectError,
1255 > {
1256 let mut connector = match socket_use {
1257 SocketUse::Http1Only => {
1258 let Some(connector) = self.connector.clone().h1_only() else {
1259 return Err(ClientConnectError::Http1NotSupported);
1260 };
1261 connector
1262 }
1263 SocketUse::Http2Only => {
1264 let Some(connector) = self.connector.clone().h2_only() else {
1265 return Err(ClientConnectError::Http2NotSupported);
1266 };
1267 connector
1268 }
1269 _ => self.connector.clone(),
1270 };
1271 let connection = connector
1272 .call(uri)
1273 .await
1274 .map_err(ClientConnectError::Connector)?;
1275 Ok(TokioIo::new(connection))
1276 }
1277}
1278
1279type Connector = proxy::ProxyConnector<HttpConnector<dns::Resolver>>;
1280
1281#[allow(clippy::declare_interior_mutable_const)]
1283const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
1284
1285#[derive(Debug, deno_error::JsError)]
1286#[class(type)]
1287pub struct ClientSendError {
1288 uri: Uri,
1289 pub source: hyper_util::client::legacy::Error,
1290}
1291
1292impl ClientSendError {
1293 pub fn is_connect_error(&self) -> bool {
1294 self.source.is_connect()
1295 }
1296
1297 fn http_info(&self) -> Option<HttpInfo> {
1298 let mut exts = Extensions::new();
1299 self.source.connect_info()?.get_extras(&mut exts);
1300 exts.remove::<HttpInfo>()
1301 }
1302}
1303
1304impl std::fmt::Display for ClientSendError {
1305 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1306 let detail = error_reporter::Report::new(&self.source);
1308
1309 match self.http_info() {
1310 Some(http_info) => {
1311 write!(
1312 f,
1313 "error sending request from {src} for {uri} ({dst}): {detail}",
1314 src = http_info.local_addr(),
1315 uri = self.uri,
1316 dst = http_info.remote_addr(),
1317 detail = detail,
1318 )
1319 }
1320 None => {
1321 write!(
1322 f,
1323 "error sending request for url ({uri}): {detail}",
1324 uri = self.uri,
1325 detail = detail,
1326 )
1327 }
1328 }
1329 }
1330}
1331
1332impl std::error::Error for ClientSendError {
1333 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1334 Some(&self.source)
1335 }
1336}
1337
1338pub trait CommonRequest {
1339 fn uri(&self) -> &Uri;
1340 fn headers_mut(&mut self) -> &mut HeaderMap;
1341}
1342
1343impl CommonRequest for http::Request<ReqBody> {
1344 fn uri(&self) -> &Uri {
1345 self.uri()
1346 }
1347
1348 fn headers_mut(&mut self) -> &mut HeaderMap {
1349 self.headers_mut()
1350 }
1351}
1352
1353impl CommonRequest for http::request::Builder {
1354 fn uri(&self) -> &Uri {
1355 http::request::Builder::uri_ref(self).expect("uri not set")
1356 }
1357
1358 fn headers_mut(&mut self) -> &mut HeaderMap {
1359 http::request::Builder::headers_mut(self).expect("headers not set")
1360 }
1361}
1362
1363impl Client {
1364 pub fn inject_common_headers(&self, req: &mut impl CommonRequest) {
1366 req
1367 .headers_mut()
1368 .entry(USER_AGENT)
1369 .or_insert_with(|| self.user_agent.clone());
1370
1371 if let Some(auth) = self.connector.proxies.http_forward_auth(req.uri()) {
1372 req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
1373 }
1374 }
1375
1376 pub async fn send(
1377 self,
1378 mut req: http::Request<ReqBody>,
1379 ) -> Result<http::Response<ResBody>, ClientSendError> {
1380 self.inject_common_headers(&mut req);
1381
1382 req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
1383
1384 let uri = req.uri().clone();
1385
1386 let resp = self
1387 .inner
1388 .oneshot(req)
1389 .await
1390 .map_err(|e| ClientSendError { uri, source: e })?;
1391 Ok(resp.map(|b| b.map_err(|e| JsErrorBox::generic(e.to_string())).boxed()))
1392 }
1393}
1394
1395pub enum ReqBody {
1397 Full(http_body_util::Full<Bytes>),
1398 Empty(http_body_util::Empty<Bytes>),
1399 Streaming(BoxBody<Bytes, JsErrorBox>),
1400}
1401
1402pub type ResBody = BoxBody<Bytes, JsErrorBox>;
1403
1404impl ReqBody {
1405 pub fn full(bytes: Bytes) -> Self {
1406 ReqBody::Full(http_body_util::Full::new(bytes))
1407 }
1408
1409 pub fn empty() -> Self {
1410 ReqBody::Empty(http_body_util::Empty::new())
1411 }
1412
1413 pub fn streaming<B>(body: B) -> Self
1414 where
1415 B: hyper::body::Body<Data = Bytes, Error = JsErrorBox>
1416 + Send
1417 + Sync
1418 + 'static,
1419 {
1420 ReqBody::Streaming(BoxBody::new(body))
1421 }
1422}
1423
1424impl hyper::body::Body for ReqBody {
1425 type Data = Bytes;
1426 type Error = JsErrorBox;
1427
1428 fn poll_frame(
1429 mut self: Pin<&mut Self>,
1430 cx: &mut Context<'_>,
1431 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1432 match &mut *self {
1433 ReqBody::Full(b) => {
1434 Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1435 }
1436 ReqBody::Empty(b) => {
1437 Pin::new(b).poll_frame(cx).map_err(|never| match never {})
1438 }
1439 ReqBody::Streaming(b) => Pin::new(b).poll_frame(cx),
1440 }
1441 }
1442
1443 fn is_end_stream(&self) -> bool {
1444 match self {
1445 ReqBody::Full(b) => b.is_end_stream(),
1446 ReqBody::Empty(b) => b.is_end_stream(),
1447 ReqBody::Streaming(b) => b.is_end_stream(),
1448 }
1449 }
1450
1451 fn size_hint(&self) -> hyper::body::SizeHint {
1452 match self {
1453 ReqBody::Full(b) => b.size_hint(),
1454 ReqBody::Empty(b) => b.size_hint(),
1455 ReqBody::Streaming(b) => b.size_hint(),
1456 }
1457 }
1458}
1459
1460pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
1464 use percent_encoding::percent_decode;
1465
1466 if url.has_authority() {
1467 let username: String = percent_decode(url.username().as_bytes())
1468 .decode_utf8()
1469 .ok()?
1470 .into();
1471 let password = url.password().and_then(|pass| {
1472 percent_decode(pass.as_bytes())
1473 .decode_utf8()
1474 .ok()
1475 .map(String::from)
1476 });
1477 if !username.is_empty() || password.is_some() {
1478 url
1479 .set_username("")
1480 .expect("has_authority means set_username shouldn't fail");
1481 url
1482 .set_password(None)
1483 .expect("has_authority means set_password shouldn't fail");
1484 return Some((username, password));
1485 }
1486 }
1487
1488 None
1489}
1490
1491#[op2(fast)]
1492fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
1493 promise.state() != v8::PromiseState::Pending
1494}
1495
1496#[derive(Clone, Debug)]
1498struct FetchRetry;
1499
1500#[derive(Clone, Debug)]
1502struct Retried;
1503
1504impl<ResBody, E>
1505 retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
1506 for FetchRetry
1507where
1508 E: std::error::Error + 'static,
1509{
1510 type Future = future::Ready<()>;
1512
1513 fn retry(
1514 &mut self,
1515 req: &mut http::Request<ReqBody>,
1516 result: &mut Result<http::Response<ResBody>, E>,
1517 ) -> Option<Self::Future> {
1518 if req.extensions().get::<Retried>().is_some() {
1519 return None;
1521 }
1522
1523 match result {
1524 Ok(..) => {
1525 None
1527 }
1528 Err(err) => {
1529 if is_error_retryable(&*err) {
1530 req.extensions_mut().insert(Retried);
1531 Some(future::ready(()))
1532 } else {
1533 None
1534 }
1535 }
1536 }
1537 }
1538
1539 fn clone_request(
1540 &mut self,
1541 req: &http::Request<ReqBody>,
1542 ) -> Option<http::Request<ReqBody>> {
1543 let body = match req.body() {
1544 ReqBody::Full(b) => ReqBody::Full(b.clone()),
1545 ReqBody::Empty(b) => ReqBody::Empty(*b),
1546 ReqBody::Streaming(..) => return None,
1547 };
1548
1549 let mut clone = http::Request::new(body);
1550 *clone.method_mut() = req.method().clone();
1551 *clone.uri_mut() = req.uri().clone();
1552 *clone.headers_mut() = req.headers().clone();
1553 *clone.extensions_mut() = req.extensions().clone();
1554 Some(clone)
1555 }
1556}
1557
1558fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
1559 if let Some(err) = find_source::<h2::Error>(err) {
1561 if err.is_go_away()
1563 && err.is_remote()
1564 && err.reason() == Some(h2::Reason::NO_ERROR)
1565 {
1566 return true;
1567 }
1568
1569 if err.is_reset()
1572 && err.is_remote()
1573 && err.reason() == Some(h2::Reason::REFUSED_STREAM)
1574 {
1575 return true;
1576 }
1577 }
1578
1579 false
1580}
1581
1582fn find_source<'a, E: std::error::Error + 'static>(
1583 err: &'a (dyn std::error::Error + 'static),
1584) -> Option<&'a E> {
1585 let mut err = Some(err);
1586 while let Some(src) = err {
1587 if let Some(found) = src.downcast_ref::<E>() {
1588 return Some(found);
1589 }
1590 err = src.source();
1591 }
1592 None
1593}