Skip to main content

deno_http/
lib.rs

1// Copyright 2018-2026 the Deno authors. MIT license.
2
3use std::borrow::Cow;
4use std::cell::RefCell;
5use std::cmp::min;
6use std::error::Error;
7use std::future::Future;
8use std::future::Pending;
9use std::future::pending;
10use std::io;
11use std::io::Write;
12use std::mem::replace;
13use std::mem::take;
14use std::net::SocketAddr;
15use std::pin::Pin;
16use std::pin::pin;
17use std::rc::Rc;
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::task::Context;
22use std::task::Poll;
23use std::task::ready;
24
25use async_compression::Level;
26use async_compression::tokio::write::BrotliEncoder;
27use async_compression::tokio::write::GzipEncoder;
28use base64::Engine;
29use base64::prelude::BASE64_STANDARD;
30use cache_control::CacheControl;
31use deno_core::AsyncRefCell;
32use deno_core::AsyncResult;
33use deno_core::BufView;
34use deno_core::CancelFuture;
35use deno_core::CancelHandle;
36use deno_core::CancelTryFuture;
37use deno_core::JsBuffer;
38use deno_core::OpState;
39use deno_core::RcRef;
40use deno_core::Resource;
41use deno_core::ResourceId;
42use deno_core::StringOrBuffer;
43use deno_core::convert::ByteString;
44use deno_core::futures::FutureExt;
45use deno_core::futures::StreamExt;
46use deno_core::futures::TryFutureExt;
47use deno_core::futures::channel::mpsc;
48use deno_core::futures::channel::oneshot;
49use deno_core::futures::future::Either;
50use deno_core::futures::future::RemoteHandle;
51use deno_core::futures::future::Shared;
52use deno_core::futures::future::select;
53use deno_core::futures::never::Never;
54use deno_core::futures::stream::Peekable;
55use deno_core::op2;
56use deno_core::unsync::spawn;
57use deno_error::JsErrorBox;
58use deno_net::raw::NetworkStream;
59use deno_telemetry::Histogram;
60use deno_telemetry::MeterProvider;
61use deno_telemetry::OTEL_GLOBALS;
62use deno_telemetry::UpDownCounter;
63use deno_websocket::ws_create_server_stream;
64use flate2::Compression;
65use flate2::write::GzEncoder;
66use hyper::server::conn::http1;
67use hyper::server::conn::http2;
68use hyper_util::rt::TokioIo;
69use hyper_v014::Body;
70use hyper_v014::HeaderMap;
71use hyper_v014::Request;
72use hyper_v014::Response;
73use hyper_v014::body::Bytes;
74use hyper_v014::body::HttpBody;
75use hyper_v014::body::SizeHint;
76use hyper_v014::header::HeaderName;
77use hyper_v014::header::HeaderValue;
78use hyper_v014::server::conn::Http;
79use hyper_v014::service::Service;
80use once_cell::sync::OnceCell;
81use tokio::io::AsyncRead;
82use tokio::io::AsyncWrite;
83use tokio::io::AsyncWriteExt;
84use tokio::net::TcpStream;
85use tokio::sync::Notify;
86
87use crate::network_buffered_stream::NetworkBufferedStream;
88use crate::reader_stream::ExternallyAbortableReaderStream;
89use crate::reader_stream::ShutdownHandle;
90
91pub mod compressible;
92mod fly_accept_encoding;
93mod http_next;
94mod network_buffered_stream;
95mod reader_stream;
96mod request_body;
97mod request_properties;
98mod response_body;
99mod service;
100
101use fly_accept_encoding::Encoding;
102pub use http_next::HttpNextError;
103pub use request_properties::DefaultHttpPropertyExtractor;
104pub use request_properties::HttpConnectionProperties;
105pub use request_properties::HttpListenProperties;
106pub use request_properties::HttpPropertyExtractor;
107pub use request_properties::HttpRequestProperties;
108pub use service::UpgradeUnavailableError;
109
110struct OtelCollectors {
111  duration: Histogram<f64>,
112  active_requests: UpDownCounter<i64>,
113  request_size: Histogram<u64>,
114  response_size: Histogram<u64>,
115}
116
117static OTEL_COLLECTORS: OnceCell<OtelCollectors> = OnceCell::new();
118
119#[derive(Debug, Default, Clone, Copy)]
120pub struct Options {
121  /// By passing a hook function, the caller can customize various configuration
122  /// options for the HTTP/2 server.
123  /// See [`http2::Builder`] for what parameters can be customized.
124  ///
125  /// If `None`, the default configuration provided by hyper will be used. Note
126  /// that the default configuration is subject to change in future versions.
127  pub http2_builder_hook:
128    Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
129  /// By passing a hook function, the caller can customize various configuration
130  /// options for the HTTP/1 server.
131  /// See [`http1::Builder`] for what parameters can be customized.
132  ///
133  /// If `None`, the default configuration provided by hyper will be used. Note
134  /// that the default configuration is subject to change in future versions.
135  pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
136
137  /// If `false`, the server will abort the request when the response is dropped.
138  pub no_legacy_abort: bool,
139}
140
141#[cfg(not(feature = "default_property_extractor"))]
142deno_core::extension!(
143  deno_http,
144  deps = [deno_web, deno_net, deno_fetch, deno_websocket],
145  parameters = [ HTTP: HttpPropertyExtractor ],
146  ops = [
147    op_http_accept,
148    op_http_headers,
149    op_http_serve_address_override,
150    op_http_shutdown,
151    op_http_upgrade_websocket,
152    op_http_websocket_accept_header,
153    op_http_write_headers,
154    op_http_write_resource,
155    op_http_write,
156    http_next::op_http_close_after_finish,
157    http_next::op_http_get_request_header,
158    http_next::op_http_get_request_headers,
159    http_next::op_http_request_on_cancel,
160    http_next::op_http_get_request_method_and_url<HTTP>,
161    http_next::op_http_get_request_cancelled,
162    http_next::op_http_read_request_body,
163    http_next::op_http_serve_on<HTTP>,
164    http_next::op_http_serve<HTTP>,
165    http_next::op_http_set_promise_complete,
166    http_next::op_http_set_response_body_bytes,
167    http_next::op_http_set_response_body_resource,
168    http_next::op_http_set_response_body_text,
169    http_next::op_http_set_response_header,
170    http_next::op_http_set_response_headers,
171    http_next::op_http_set_response_trailers,
172    http_next::op_http_upgrade_websocket_next,
173    http_next::op_http_upgrade_raw,
174    http_next::op_raw_write_vectored,
175    http_next::op_can_write_vectored,
176    http_next::op_http_try_wait,
177    http_next::op_http_wait,
178    http_next::op_http_close,
179    http_next::op_http_cancel,
180    http_next::op_http_metric_handle_otel_error,
181  ],
182  esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
183  options = {
184    options: Options,
185  },
186  state = |state, options| {
187    state.put::<Options>(options.options);
188  }
189);
190
191#[cfg(feature = "default_property_extractor")]
192deno_core::extension!(
193  deno_http,
194  deps = [deno_web, deno_net, deno_fetch, deno_websocket],
195  ops = [
196    op_http_accept,
197    op_http_headers,
198    op_http_serve_address_override,
199    op_http_shutdown,
200    op_http_upgrade_websocket,
201    op_http_websocket_accept_header,
202    op_http_write_headers,
203    op_http_write_resource,
204    op_http_write,
205    op_http_notify_serving,
206    http_next::op_http_close_after_finish,
207    http_next::op_http_get_request_header,
208    http_next::op_http_get_request_headers,
209    http_next::op_http_request_on_cancel,
210    http_next::op_http_get_request_method_and_url<DefaultHttpPropertyExtractor>,
211    http_next::op_http_get_request_cancelled,
212    http_next::op_http_read_request_body,
213    http_next::op_http_serve_on<DefaultHttpPropertyExtractor>,
214    http_next::op_http_serve<DefaultHttpPropertyExtractor>,
215    http_next::op_http_set_promise_complete,
216    http_next::op_http_set_response_body_bytes,
217    http_next::op_http_set_response_body_resource,
218    http_next::op_http_set_response_body_text,
219    http_next::op_http_set_response_header,
220    http_next::op_http_set_response_headers,
221    http_next::op_http_set_response_trailers,
222    http_next::op_http_upgrade_websocket_next,
223    http_next::op_http_upgrade_raw,
224    http_next::op_raw_write_vectored,
225    http_next::op_can_write_vectored,
226    http_next::op_http_try_wait,
227    http_next::op_http_wait,
228    http_next::op_http_close,
229    http_next::op_http_cancel,
230    http_next::op_http_metric_handle_otel_error,
231  ],
232  esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
233  options = {
234    options: Options,
235  },
236  state = |state, options| {
237    state.put::<Options>(options.options);
238  }
239);
240
241#[derive(Debug, thiserror::Error, deno_error::JsError)]
242pub enum HttpError {
243  #[class(inherit)]
244  #[error(transparent)]
245  Resource(#[from] deno_core::error::ResourceError),
246  #[class(inherit)]
247  #[error(transparent)]
248  Canceled(#[from] deno_core::Canceled),
249  #[class("Http")]
250  #[error("{0}")]
251  HyperV014(#[source] Arc<hyper_v014::Error>),
252  #[class(generic)]
253  #[error("{0}")]
254  InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
255  #[class(generic)]
256  #[error("{0}")]
257  InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
258  #[class(generic)]
259  #[error("{0}")]
260  Http(#[from] hyper_v014::http::Error),
261  #[class("Http")]
262  #[error("response headers already sent")]
263  ResponseHeadersAlreadySent,
264  #[class("Http")]
265  #[error("connection closed while sending response")]
266  ConnectionClosedWhileSendingResponse,
267  #[class("Http")]
268  #[error("already in use")]
269  AlreadyInUse,
270  #[class(inherit)]
271  #[error("{0}")]
272  Io(#[from] std::io::Error),
273  #[class("Http")]
274  #[error("no response headers")]
275  NoResponseHeaders,
276  #[class("Http")]
277  #[error("response already completed")]
278  ResponseAlreadyCompleted,
279  #[class("Http")]
280  #[error("cannot upgrade because request body was used")]
281  UpgradeBodyUsed,
282  #[class("Http")]
283  #[error(transparent)]
284  Other(#[from] JsErrorBox),
285}
286
287pub enum HttpSocketAddr {
288  IpSocket(std::net::SocketAddr),
289  #[cfg(unix)]
290  UnixSocket(tokio::net::unix::SocketAddr),
291}
292
293impl From<std::net::SocketAddr> for HttpSocketAddr {
294  fn from(addr: std::net::SocketAddr) -> Self {
295    Self::IpSocket(addr)
296  }
297}
298
299#[cfg(unix)]
300impl From<tokio::net::unix::SocketAddr> for HttpSocketAddr {
301  fn from(addr: tokio::net::unix::SocketAddr) -> Self {
302    Self::UnixSocket(addr)
303  }
304}
305
306struct OtelInfo {
307  attributes: OtelInfoAttributes,
308  duration: Option<std::time::Instant>,
309  request_size: Option<u64>,
310  response_size: Option<u64>,
311}
312
313struct OtelInfoAttributes {
314  http_request_method: Cow<'static, str>,
315  network_protocol_version: &'static str,
316  url_scheme: Cow<'static, str>,
317  server_address: Option<String>,
318  server_port: Option<i64>,
319  error_type: Option<&'static str>,
320  http_response_status_code: Option<i64>,
321}
322
323impl OtelInfoAttributes {
324  fn method(method: &http::method::Method) -> Cow<'static, str> {
325    use http::method::Method;
326
327    match *method {
328      Method::GET => Cow::Borrowed("GET"),
329      Method::POST => Cow::Borrowed("POST"),
330      Method::PUT => Cow::Borrowed("PUT"),
331      Method::DELETE => Cow::Borrowed("DELETE"),
332      Method::HEAD => Cow::Borrowed("HEAD"),
333      Method::OPTIONS => Cow::Borrowed("OPTIONS"),
334      Method::CONNECT => Cow::Borrowed("CONNECT"),
335      Method::PATCH => Cow::Borrowed("PATCH"),
336      Method::TRACE => Cow::Borrowed("TRACE"),
337      _ => Cow::Owned(method.to_string()),
338    }
339  }
340
341  fn method_v02(method: &http_v02::method::Method) -> Cow<'static, str> {
342    use http_v02::method::Method;
343
344    match *method {
345      Method::GET => Cow::Borrowed("GET"),
346      Method::POST => Cow::Borrowed("POST"),
347      Method::PUT => Cow::Borrowed("PUT"),
348      Method::DELETE => Cow::Borrowed("DELETE"),
349      Method::HEAD => Cow::Borrowed("HEAD"),
350      Method::OPTIONS => Cow::Borrowed("OPTIONS"),
351      Method::CONNECT => Cow::Borrowed("CONNECT"),
352      Method::PATCH => Cow::Borrowed("PATCH"),
353      Method::TRACE => Cow::Borrowed("TRACE"),
354      _ => Cow::Owned(method.to_string()),
355    }
356  }
357
358  fn version(version: http::Version) -> &'static str {
359    use http::Version;
360
361    match version {
362      Version::HTTP_09 => "0.9",
363      Version::HTTP_10 => "1.0",
364      Version::HTTP_11 => "1.1",
365      Version::HTTP_2 => "2",
366      Version::HTTP_3 => "3",
367      _ => unreachable!(),
368    }
369  }
370
371  fn version_v02(version: http_v02::Version) -> &'static str {
372    use http_v02::Version;
373
374    match version {
375      Version::HTTP_09 => "0.9",
376      Version::HTTP_10 => "1.0",
377      Version::HTTP_11 => "1.1",
378      Version::HTTP_2 => "2",
379      Version::HTTP_3 => "3",
380      _ => unreachable!(),
381    }
382  }
383
384  fn for_counter(&self) -> Vec<deno_telemetry::KeyValue> {
385    let mut attributes = vec![
386      deno_telemetry::KeyValue::new(
387        "http.request.method",
388        self.http_request_method.clone(),
389      ),
390      deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
391    ];
392
393    if let Some(address) = self.server_address.clone() {
394      attributes.push(deno_telemetry::KeyValue::new("server.address", address));
395    }
396    if let Some(port) = self.server_port {
397      attributes.push(deno_telemetry::KeyValue::new("server.port", port));
398    }
399
400    attributes
401  }
402
403  fn for_histogram(&self) -> Vec<deno_telemetry::KeyValue> {
404    let mut histogram_attributes = vec![
405      deno_telemetry::KeyValue::new(
406        "http.request.method",
407        self.http_request_method.clone(),
408      ),
409      deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
410      deno_telemetry::KeyValue::new(
411        "network.protocol.version",
412        self.network_protocol_version,
413      ),
414    ];
415
416    if let Some(address) = self.server_address.clone() {
417      histogram_attributes
418        .push(deno_telemetry::KeyValue::new("server.address", address));
419    }
420    if let Some(port) = self.server_port {
421      histogram_attributes
422        .push(deno_telemetry::KeyValue::new("server.port", port));
423    }
424    if let Some(status_code) = self.http_response_status_code {
425      histogram_attributes.push(deno_telemetry::KeyValue::new(
426        "http.response.status_code",
427        status_code,
428      ));
429    }
430
431    if let Some(error) = self.error_type {
432      histogram_attributes
433        .push(deno_telemetry::KeyValue::new("error.type", error));
434    }
435
436    histogram_attributes
437  }
438}
439
440impl OtelInfo {
441  fn new(
442    otel: &deno_telemetry::OtelGlobals,
443    instant: std::time::Instant,
444    request_size: u64,
445    attributes: OtelInfoAttributes,
446  ) -> Self {
447    let collectors = OTEL_COLLECTORS.get_or_init(|| {
448      let meter = otel
449        .meter_provider
450        .meter_with_scope(otel.builtin_instrumentation_scope.clone());
451
452      let duration = meter
453        .f64_histogram("http.server.request.duration")
454        .with_unit("s")
455        .with_description("Duration of HTTP server requests.")
456        .with_boundaries(vec![
457          0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0,
458          7.5, 10.0,
459        ])
460        .build();
461
462      let active_requests = meter
463        .i64_up_down_counter("http.server.active_requests")
464        .with_unit("{request}")
465        .with_description("Number of active HTTP server requests.")
466        .build();
467
468      let request_size = meter
469        .u64_histogram("http.server.request.body.size")
470        .with_unit("By")
471        .with_description("Size of HTTP server request bodies.")
472        .with_boundaries(vec![
473          0.0,
474          100.0,
475          1000.0,
476          10000.0,
477          100000.0,
478          1000000.0,
479          10000000.0,
480          100000000.0,
481          1000000000.0,
482        ])
483        .build();
484
485      let response_size = meter
486        .u64_histogram("http.server.response.body.size")
487        .with_unit("By")
488        .with_description("Size of HTTP server response bodies.")
489        .with_boundaries(vec![
490          0.0,
491          100.0,
492          1000.0,
493          10000.0,
494          100000.0,
495          1000000.0,
496          10000000.0,
497          100000000.0,
498          1000000000.0,
499        ])
500        .build();
501
502      OtelCollectors {
503        duration,
504        active_requests,
505        request_size,
506        response_size,
507      }
508    });
509
510    collectors.active_requests.add(1, &attributes.for_counter());
511
512    Self {
513      attributes,
514      duration: Some(instant),
515      request_size: Some(request_size),
516      response_size: Some(0),
517    }
518  }
519
520  fn handle_duration_and_request_size(&mut self) {
521    let collectors = OTEL_COLLECTORS.get().unwrap();
522    let attributes = self.attributes.for_histogram();
523
524    if let Some(duration) = self.duration.take() {
525      let duration = duration.elapsed();
526      collectors
527        .duration
528        .record(duration.as_secs_f64(), &attributes);
529    }
530
531    if let Some(request_size) = self.request_size.take() {
532      let collectors = OTEL_COLLECTORS.get().unwrap();
533      collectors.request_size.record(request_size, &attributes);
534    }
535  }
536}
537
538impl Drop for OtelInfo {
539  fn drop(&mut self) {
540    let collectors = OTEL_COLLECTORS.get().unwrap();
541
542    self.handle_duration_and_request_size();
543
544    collectors
545      .active_requests
546      .add(-1, &self.attributes.for_counter());
547
548    if let Some(response_size) = self.response_size {
549      collectors
550        .response_size
551        .record(response_size, &self.attributes.for_histogram());
552    }
553  }
554}
555
556fn handle_error_otel(
557  otel: &Option<Rc<RefCell<Option<OtelInfo>>>>,
558  error: &HttpError,
559) {
560  if let Some(otel) = otel.as_ref() {
561    let mut maybe_otel_info = otel.borrow_mut();
562    if let Some(otel_info) = maybe_otel_info.as_mut() {
563      otel_info.attributes.error_type = Some(match error {
564        HttpError::Resource(_) => "resource",
565        HttpError::Canceled(_) => "canceled",
566        HttpError::HyperV014(_) => "hyper",
567        HttpError::InvalidHeaderName(_) => "invalid header name",
568        HttpError::InvalidHeaderValue(_) => "invalid header value",
569        HttpError::Http(_) => "http",
570        HttpError::ResponseHeadersAlreadySent => {
571          "response headers already sent"
572        }
573        HttpError::ConnectionClosedWhileSendingResponse => {
574          "connection closed while sending response"
575        }
576        HttpError::AlreadyInUse => "already in use",
577        HttpError::Io(_) => "io",
578        HttpError::NoResponseHeaders => "no response headers",
579        HttpError::ResponseAlreadyCompleted => "response already completed",
580        HttpError::UpgradeBodyUsed => "upgrade body used",
581        HttpError::Other(_) => "unknown",
582      });
583    }
584  }
585}
586
587struct HttpConnResource {
588  addr: HttpSocketAddr,
589  scheme: &'static str,
590  acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
591  closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper_v014::Error>>>>,
592  cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops.
593}
594
595impl HttpConnResource {
596  fn new<S>(io: S, scheme: &'static str, addr: HttpSocketAddr) -> Self
597  where
598    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
599  {
600    let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
601    let service = HttpService::new(acceptors_rx);
602
603    let conn_fut = Http::new()
604      .with_executor(LocalExecutor)
605      .serve_connection(io, service)
606      .with_upgrades();
607
608    // When the cancel handle is used, the connection shuts down gracefully.
609    // No new HTTP streams will be accepted, but existing streams will be able
610    // to continue operating and eventually shut down cleanly.
611    let cancel_handle = CancelHandle::new_rc();
612    let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
613
614    // A local task that polls the hyper connection future to completion.
615    let task_fut = async move {
616      let conn_fut = pin!(conn_fut);
617      let shutdown_fut = pin!(shutdown_fut);
618      let result = match select(conn_fut, shutdown_fut).await {
619        Either::Left((result, _)) => result,
620        Either::Right((_, mut conn_fut)) => {
621          conn_fut.as_mut().graceful_shutdown();
622          conn_fut.await
623        }
624      };
625      filter_enotconn(result).map_err(Arc::from)
626    };
627    let (task_fut, closed_fut) = task_fut.remote_handle();
628    let closed_fut = closed_fut.shared();
629    spawn(task_fut);
630
631    Self {
632      addr,
633      scheme,
634      acceptors_tx,
635      closed_fut,
636      cancel_handle,
637    }
638  }
639
640  // Accepts a new incoming HTTP request.
641  async fn accept(
642    self: &Rc<Self>,
643  ) -> Result<
644    Option<(
645      HttpStreamReadResource,
646      HttpStreamWriteResource,
647      String,
648      String,
649    )>,
650    HttpError,
651  > {
652    let fut = async {
653      let (request_tx, request_rx) = oneshot::channel();
654      let (response_tx, response_rx) = oneshot::channel();
655
656      let otel_instant = OTEL_GLOBALS
657        .get()
658        .filter(|o| o.has_metrics())
659        .map(|_| std::time::Instant::now());
660
661      let acceptor = HttpAcceptor::new(request_tx, response_rx);
662      self.acceptors_tx.unbounded_send(acceptor).ok()?;
663
664      let request = request_rx.await.ok()?;
665      let accept_encoding = {
666        let encodings =
667          fly_accept_encoding::encodings_iter_http_02(request.headers())
668            .filter(|r| {
669              matches!(r, Ok((Some(Encoding::Brotli | Encoding::Gzip), _)))
670            });
671
672        fly_accept_encoding::preferred(encodings)
673          .ok()
674          .flatten()
675          .unwrap_or(Encoding::Identity)
676      };
677
678      let otel_info =
679        OTEL_GLOBALS.get().filter(|o| o.has_metrics()).map(|otel| {
680          let size_hint = request.size_hint();
681          Rc::new(RefCell::new(Some(OtelInfo::new(
682            otel,
683            otel_instant.unwrap(),
684            size_hint.upper().unwrap_or(size_hint.lower()),
685            OtelInfoAttributes {
686              http_request_method: OtelInfoAttributes::method_v02(
687                request.method(),
688              ),
689              url_scheme: Cow::Borrowed(self.scheme),
690              network_protocol_version: OtelInfoAttributes::version_v02(
691                request.version(),
692              ),
693              server_address: request.uri().host().map(|host| host.to_string()),
694              server_port: request.uri().port_u16().map(|port| port as i64),
695              error_type: Default::default(),
696              http_response_status_code: Default::default(),
697            },
698          ))))
699        });
700
701      let method = request.method().to_string();
702      let url = req_url(&request, self.scheme, &self.addr);
703      let read_stream =
704        HttpStreamReadResource::new(self, request, otel_info.clone());
705      let write_stream = HttpStreamWriteResource::new(
706        self,
707        response_tx,
708        accept_encoding,
709        otel_info,
710      );
711      Some((read_stream, write_stream, method, url))
712    };
713
714    async {
715      match fut.await {
716        Some(stream) => Ok(Some(stream)),
717        // Return the connection error, if any.
718        None => self.closed().map_ok(|_| None).await,
719      }
720    }
721    .try_or_cancel(&self.cancel_handle)
722    .await
723  }
724
725  /// A future that completes when this HTTP connection is closed or errors.
726  async fn closed(&self) -> Result<(), HttpError> {
727    self.closed_fut.clone().map_err(HttpError::HyperV014).await
728  }
729}
730
731impl Resource for HttpConnResource {
732  fn name(&self) -> Cow<'_, str> {
733    "httpConn".into()
734  }
735
736  fn close(self: Rc<Self>) {
737    self.cancel_handle.cancel();
738  }
739}
740
741/// Creates a new HttpConn resource which uses `io` as its transport.
742pub fn http_create_conn_resource<S, A>(
743  state: &mut OpState,
744  io: S,
745  addr: A,
746  scheme: &'static str,
747) -> ResourceId
748where
749  S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
750  A: Into<HttpSocketAddr>,
751{
752  let conn = HttpConnResource::new(io, scheme, addr.into());
753  state.resource_table.add(conn)
754}
755
756/// An object that implements the `hyper::Service` trait, through which Hyper
757/// delivers incoming HTTP requests.
758struct HttpService {
759  acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
760}
761
762impl HttpService {
763  fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
764    let acceptors_rx = acceptors_rx.peekable();
765    Self { acceptors_rx }
766  }
767}
768
769impl Service<Request<Body>> for HttpService {
770  type Response = Response<Body>;
771  type Error = oneshot::Canceled;
772  type Future = oneshot::Receiver<Response<Body>>;
773
774  fn poll_ready(
775    &mut self,
776    cx: &mut Context<'_>,
777  ) -> Poll<Result<(), Self::Error>> {
778    let acceptors_rx = Pin::new(&mut self.acceptors_rx);
779    let result = ready!(acceptors_rx.poll_peek(cx))
780      .map(|_| ())
781      .ok_or(oneshot::Canceled);
782    Poll::Ready(result)
783  }
784
785  fn call(&mut self, request: Request<Body>) -> Self::Future {
786    let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
787    acceptor.call(request)
788  }
789}
790
791/// A pair of one-shot channels which first transfer a HTTP request from the
792/// Hyper service to the HttpConn resource, and then take the Response back to
793/// the service.
794struct HttpAcceptor {
795  request_tx: oneshot::Sender<Request<Body>>,
796  response_rx: oneshot::Receiver<Response<Body>>,
797}
798
799impl HttpAcceptor {
800  fn new(
801    request_tx: oneshot::Sender<Request<Body>>,
802    response_rx: oneshot::Receiver<Response<Body>>,
803  ) -> Self {
804    Self {
805      request_tx,
806      response_rx,
807    }
808  }
809
810  fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
811    let Self {
812      request_tx,
813      response_rx,
814    } = self;
815    request_tx
816      .send(request)
817      .map(|_| response_rx)
818      .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver.
819  }
820}
821
822pub struct HttpStreamReadResource {
823  _conn: Rc<HttpConnResource>,
824  pub rd: AsyncRefCell<HttpRequestReader>,
825  cancel_handle: CancelHandle,
826  size: SizeHint,
827  otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
828}
829
830pub struct HttpStreamWriteResource {
831  conn: Rc<HttpConnResource>,
832  wr: AsyncRefCell<HttpResponseWriter>,
833  accept_encoding: Encoding,
834  otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
835}
836
837impl HttpStreamReadResource {
838  fn new(
839    conn: &Rc<HttpConnResource>,
840    request: Request<Body>,
841    otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
842  ) -> Self {
843    let size = request.body().size_hint();
844    Self {
845      _conn: conn.clone(),
846      rd: HttpRequestReader::Headers(request).into(),
847      size,
848      cancel_handle: CancelHandle::new(),
849      otel_info,
850    }
851  }
852}
853
854impl Resource for HttpStreamReadResource {
855  fn name(&self) -> Cow<'_, str> {
856    "httpReadStream".into()
857  }
858
859  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
860    Box::pin(async move {
861      let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
862
863      let body = loop {
864        match &mut *rd {
865          HttpRequestReader::Headers(_) => {}
866          HttpRequestReader::Body(_, body) => break body,
867          HttpRequestReader::Closed => return Ok(BufView::empty()),
868        }
869        match take(&mut *rd) {
870          HttpRequestReader::Headers(request) => {
871            let (parts, body) = request.into_parts();
872            *rd = HttpRequestReader::Body(parts.headers, body.peekable());
873          }
874          _ => unreachable!(),
875        };
876      };
877
878      let fut = async {
879        let mut body = Pin::new(body);
880        loop {
881          match body.as_mut().peek_mut().await {
882            Some(Ok(chunk)) if !chunk.is_empty() => {
883              let len = min(limit, chunk.len());
884              let buf = chunk.split_to(len);
885              let view = BufView::from(buf);
886              break Ok(view);
887            }
888            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
889            // currently has a peeked value that can be synchronously returned
890            // from `next()`.
891            //
892            // The future returned from `next()` is always ready, so we can
893            // safely call `await` on it without creating a race condition.
894            Some(_) => match body.as_mut().next().await.unwrap() {
895              Ok(chunk) => assert!(chunk.is_empty()),
896              Err(err) => {
897                break Err(JsErrorBox::from_err(HttpError::HyperV014(
898                  Arc::new(err),
899                )));
900              }
901            },
902            None => break Ok(BufView::empty()),
903          }
904        }
905      };
906
907      let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
908      fut.try_or_cancel(cancel_handle).await
909    })
910  }
911
912  fn close(self: Rc<Self>) {
913    self.cancel_handle.cancel();
914  }
915
916  fn size_hint(&self) -> (u64, Option<u64>) {
917    (self.size.lower(), self.size.upper())
918  }
919}
920
921impl HttpStreamWriteResource {
922  fn new(
923    conn: &Rc<HttpConnResource>,
924    response_tx: oneshot::Sender<Response<Body>>,
925    accept_encoding: Encoding,
926    otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
927  ) -> Self {
928    Self {
929      conn: conn.clone(),
930      wr: HttpResponseWriter::Headers(response_tx).into(),
931      accept_encoding,
932      otel_info,
933    }
934  }
935}
936
937impl Resource for HttpStreamWriteResource {
938  fn name(&self) -> Cow<'_, str> {
939    "httpWriteStream".into()
940  }
941}
942
943/// The read half of an HTTP stream.
944#[derive(Default)]
945pub enum HttpRequestReader {
946  Headers(Request<Body>),
947  Body(HeaderMap<HeaderValue>, Peekable<Body>),
948  #[default]
949  Closed,
950}
951
952/// The write half of an HTTP stream.
953#[derive(Default)]
954enum HttpResponseWriter {
955  Headers(oneshot::Sender<Response<Body>>),
956  Body {
957    writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
958    shutdown_handle: ShutdownHandle,
959  },
960  BodyUncompressed(BodyUncompressedSender),
961  #[default]
962  Closed,
963}
964
965struct BodyUncompressedSender(Option<hyper_v014::body::Sender>);
966
967impl BodyUncompressedSender {
968  fn sender(&mut self) -> &mut hyper_v014::body::Sender {
969    // This is safe because we only ever take the sender out of the option
970    // inside of the shutdown method.
971    self.0.as_mut().unwrap()
972  }
973
974  fn shutdown(mut self) {
975    // take the sender out of self so that when self is dropped at the end of
976    // this block, it doesn't get aborted
977    self.0.take();
978  }
979}
980
981impl From<hyper_v014::body::Sender> for BodyUncompressedSender {
982  fn from(sender: hyper_v014::body::Sender) -> Self {
983    BodyUncompressedSender(Some(sender))
984  }
985}
986
987impl Drop for BodyUncompressedSender {
988  fn drop(&mut self) {
989    if let Some(sender) = self.0.take() {
990      sender.abort();
991    }
992  }
993}
994
995// We use a tuple instead of struct to avoid serialization overhead of the keys.
996#[derive(deno_core::ToV8)]
997struct NextRequestResponse(
998  // read_stream_rid:
999  ResourceId,
1000  // write_stream_rid:
1001  ResourceId,
1002  // method:
1003  // This is a String rather than a ByteString because reqwest will only return
1004  // the method as a str which is guaranteed to be ASCII-only.
1005  String,
1006  // url:
1007  String,
1008);
1009
1010#[op2]
1011async fn op_http_accept(
1012  state: Rc<RefCell<OpState>>,
1013  #[smi] rid: ResourceId,
1014) -> Result<Option<NextRequestResponse>, HttpError> {
1015  let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
1016
1017  match conn.accept().await {
1018    Ok(Some((read_stream, write_stream, method, url))) => {
1019      let read_stream_rid = state
1020        .borrow_mut()
1021        .resource_table
1022        .add_rc(Rc::new(read_stream));
1023      let write_stream_rid = state
1024        .borrow_mut()
1025        .resource_table
1026        .add_rc(Rc::new(write_stream));
1027      let r =
1028        NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
1029      Ok(Some(r))
1030    }
1031    Ok(None) => Ok(None),
1032    Err(err) => Err(err),
1033  }
1034}
1035
1036fn req_url(
1037  req: &hyper_v014::Request<hyper_v014::Body>,
1038  scheme: &'static str,
1039  addr: &HttpSocketAddr,
1040) -> String {
1041  let host: Cow<'_, str> = match addr {
1042    HttpSocketAddr::IpSocket(addr) => {
1043      if let Some(auth) = req.uri().authority() {
1044        match addr.port() {
1045          443 if scheme == "https" => Cow::Borrowed(auth.host()),
1046          80 if scheme == "http" => Cow::Borrowed(auth.host()),
1047          _ => Cow::Borrowed(auth.as_str()), // Includes port number.
1048        }
1049      } else if let Some(host) = req.uri().host() {
1050        Cow::Borrowed(host)
1051      } else if let Some(host) = req.headers().get("HOST") {
1052        match host.to_str() {
1053          Ok(host) => Cow::Borrowed(host),
1054          Err(_) => Cow::Owned(
1055            host
1056              .as_bytes()
1057              .iter()
1058              .cloned()
1059              .map(char::from)
1060              .collect::<String>(),
1061          ),
1062        }
1063      } else {
1064        Cow::Owned(addr.to_string())
1065      }
1066    }
1067    // There is no standard way for unix domain socket URLs
1068    // nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL
1069    // httpie uses http+unix://[percent_encoding_of_path]/ which we follow
1070    #[cfg(unix)]
1071    HttpSocketAddr::UnixSocket(addr) => Cow::Owned(
1072      percent_encoding::percent_encode(
1073        addr
1074          .as_pathname()
1075          .and_then(|x| x.to_str())
1076          .unwrap_or_default()
1077          .as_bytes(),
1078        percent_encoding::NON_ALPHANUMERIC,
1079      )
1080      .to_string(),
1081    ),
1082  };
1083  let path = req
1084    .uri()
1085    .path_and_query()
1086    .map(|p| p.as_str())
1087    .unwrap_or("/");
1088  [scheme, "://", &host, path].concat()
1089}
1090
1091fn req_headers(
1092  header_map: &HeaderMap<HeaderValue>,
1093) -> Vec<(ByteString, ByteString)> {
1094  // We treat cookies specially, because we don't want them to get them
1095  // mangled by the `Headers` object in JS. What we do is take all cookie
1096  // headers and concat them into a single cookie header, separated by
1097  // semicolons.
1098  let cookie_sep = "; ".as_bytes();
1099  let mut cookies = vec![];
1100
1101  let mut headers = Vec::with_capacity(header_map.len());
1102  for (name, value) in header_map.iter() {
1103    if name == hyper_v014::header::COOKIE {
1104      cookies.push(value.as_bytes());
1105    } else {
1106      let name: &[u8] = name.as_ref();
1107      let value = value.as_bytes();
1108      headers.push((name.into(), value.into()));
1109    }
1110  }
1111
1112  if !cookies.is_empty() {
1113    headers.push(("cookie".into(), cookies.join(cookie_sep).into()));
1114  }
1115
1116  headers
1117}
1118
1119#[op2]
1120async fn op_http_write_headers(
1121  state: Rc<RefCell<OpState>>,
1122  #[smi] rid: u32,
1123  #[smi] status: u16,
1124  #[scoped] headers: Vec<(ByteString, ByteString)>,
1125  #[serde] data: Option<StringOrBuffer>,
1126) -> Result<(), HttpError> {
1127  let stream = state
1128    .borrow_mut()
1129    .resource_table
1130    .get::<HttpStreamWriteResource>(rid)?;
1131
1132  // Track supported encoding
1133  let encoding = stream.accept_encoding;
1134
1135  let mut builder = Response::builder();
1136  // SAFETY: can not fail, since a fresh Builder is non-errored
1137  let hmap = unsafe { builder.headers_mut().unwrap_unchecked() };
1138
1139  // Add headers
1140  hmap.reserve(headers.len() + 2);
1141  for (k, v) in headers.into_iter() {
1142    let v: Vec<u8> = v.into();
1143    hmap.append(
1144      HeaderName::try_from(k.as_slice())?,
1145      HeaderValue::try_from(v)?,
1146    );
1147  }
1148  ensure_vary_accept_encoding(hmap);
1149
1150  let accepts_compression =
1151    matches!(encoding, Encoding::Brotli | Encoding::Gzip);
1152  let compressing = accepts_compression
1153    && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
1154    && should_compress(hmap);
1155
1156  if compressing {
1157    weaken_etag(hmap);
1158    // Drop 'content-length' header. Hyper will update it using compressed body.
1159    hmap.remove(hyper_v014::header::CONTENT_LENGTH);
1160    // Content-Encoding header
1161    hmap.insert(
1162      hyper_v014::header::CONTENT_ENCODING,
1163      HeaderValue::from_static(match encoding {
1164        Encoding::Brotli => "br",
1165        Encoding::Gzip => "gzip",
1166        _ => unreachable!(), // Forbidden by accepts_compression
1167      }),
1168    );
1169  }
1170
1171  let (new_wr, body) = http_response(data, compressing, encoding)?;
1172  let body = builder.status(status).body(body)?;
1173
1174  if let Some(otel) = stream.otel_info.as_ref() {
1175    let mut otel = otel.borrow_mut();
1176    if let Some(otel_info) = otel.as_mut() {
1177      otel_info.attributes.http_response_status_code = Some(status as _);
1178      otel_info.handle_duration_and_request_size();
1179    }
1180  }
1181
1182  let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1183  let response_tx = match replace(&mut *old_wr, new_wr) {
1184    HttpResponseWriter::Headers(response_tx) => response_tx,
1185    _ => return Err(HttpError::ResponseHeadersAlreadySent),
1186  };
1187
1188  match response_tx.send(body) {
1189    Ok(_) => Ok(()),
1190    Err(_) => {
1191      stream.conn.closed().await?;
1192      Err(HttpError::ConnectionClosedWhileSendingResponse)
1193    }
1194  }
1195}
1196
1197#[op2]
1198fn op_http_headers(
1199  state: &mut OpState,
1200  #[smi] rid: u32,
1201) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
1202  let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
1203  let rd = RcRef::map(&stream, |r| &r.rd)
1204    .try_borrow()
1205    .ok_or(HttpError::AlreadyInUse)
1206    .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?;
1207  match &*rd {
1208    HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
1209    HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
1210    _ => unreachable!(),
1211  }
1212}
1213
1214fn http_response(
1215  data: Option<StringOrBuffer>,
1216  compressing: bool,
1217  encoding: Encoding,
1218) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
1219  // Gzip, after level 1, doesn't produce significant size difference.
1220  // This default matches nginx default gzip compression level (1):
1221  // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
1222  const GZIP_DEFAULT_COMPRESSION_LEVEL: u8 = 1;
1223
1224  match data {
1225    Some(data) if compressing => match encoding {
1226      Encoding::Brotli => {
1227        // quality level 6 is based on google's nginx default value for
1228        // on-the-fly compression
1229        // https://github.com/google/ngx_brotli#brotli_comp_level
1230        // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
1231        // (~4MB)
1232        let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
1233        writer.write_all(&data)?;
1234        Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
1235      }
1236      Encoding::Gzip => {
1237        let mut writer = GzEncoder::new(
1238          Vec::new(),
1239          Compression::new(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1240        );
1241        writer.write_all(&data)?;
1242        Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
1243      }
1244      _ => unreachable!(), // forbidden by accepts_compression
1245    },
1246    Some(data) => {
1247      // If a buffer was passed, but isn't compressible, we use it to
1248      // construct a response body.
1249      Ok((HttpResponseWriter::Closed, data.to_vec().into()))
1250    }
1251    None if compressing => {
1252      // Create a one way pipe that implements tokio's async io traits. To do
1253      // this we create a [tokio::io::DuplexStream], but then throw away one
1254      // of the directions to create a one way pipe.
1255      let (a, b) = tokio::io::duplex(64 * 1024);
1256      let (reader, _) = tokio::io::split(a);
1257      let (_, writer) = tokio::io::split(b);
1258      let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
1259        Encoding::Brotli => {
1260          Box::pin(BrotliEncoder::with_quality(writer, Level::Fastest))
1261        }
1262        Encoding::Gzip => Box::pin(GzipEncoder::with_quality(
1263          writer,
1264          Level::Precise(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1265        )),
1266        _ => unreachable!(), // forbidden by accepts_compression
1267      };
1268      let (stream, shutdown_handle) =
1269        ExternallyAbortableReaderStream::new(reader);
1270      Ok((
1271        HttpResponseWriter::Body {
1272          writer,
1273          shutdown_handle,
1274        },
1275        Body::wrap_stream(stream),
1276      ))
1277    }
1278    None => {
1279      let (body_tx, body_rx) = Body::channel();
1280      Ok((
1281        HttpResponseWriter::BodyUncompressed(body_tx.into()),
1282        body_rx,
1283      ))
1284    }
1285  }
1286}
1287
1288// If user provided a ETag header for uncompressed data, we need to
1289// ensure it is a Weak Etag header ("W/").
1290fn weaken_etag(hmap: &mut hyper_v014::HeaderMap) {
1291  if let Some(etag) = hmap.get_mut(hyper_v014::header::ETAG)
1292    && !etag.as_bytes().starts_with(b"W/")
1293  {
1294    let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
1295    v.extend(b"W/");
1296    v.extend(etag.as_bytes());
1297    *etag = v.try_into().unwrap();
1298  }
1299}
1300
1301// Set Vary: Accept-Encoding header for direct body response.
1302// Note: we set the header irrespective of whether or not we compress the data
1303// to make sure cache services do not serve uncompressed data to clients that
1304// support compression.
1305fn ensure_vary_accept_encoding(hmap: &mut hyper_v014::HeaderMap) {
1306  if let Some(v) = hmap.get_mut(hyper_v014::header::VARY)
1307    && let Ok(s) = v.to_str()
1308  {
1309    if !s.to_lowercase().contains("accept-encoding") {
1310      *v = format!("Accept-Encoding, {s}").try_into().unwrap()
1311    }
1312    return;
1313  }
1314  hmap.insert(
1315    hyper_v014::header::VARY,
1316    HeaderValue::from_static("Accept-Encoding"),
1317  );
1318}
1319
1320fn should_compress(headers: &hyper_v014::HeaderMap) -> bool {
1321  // skip compression if the cache-control header value is set to "no-transform" or not utf8
1322  fn cache_control_no_transform(
1323    headers: &hyper_v014::HeaderMap,
1324  ) -> Option<bool> {
1325    let v = headers.get(hyper_v014::header::CACHE_CONTROL)?;
1326    let s = match std::str::from_utf8(v.as_bytes()) {
1327      Ok(s) => s,
1328      Err(_) => return Some(true),
1329    };
1330    let c = CacheControl::from_value(s)?;
1331    Some(c.no_transform)
1332  }
1333  // we skip compression if the `content-range` header value is set, as it
1334  // indicates the contents of the body were negotiated based directly
1335  // with the user code and we can't compress the response
1336  let content_range = headers.contains_key(hyper_v014::header::CONTENT_RANGE);
1337  // assume body is already compressed if Content-Encoding header present, thus avoid recompressing
1338  let is_precompressed =
1339    headers.contains_key(hyper_v014::header::CONTENT_ENCODING);
1340
1341  !content_range
1342    && !is_precompressed
1343    && !cache_control_no_transform(headers).unwrap_or_default()
1344    && headers
1345      .get(hyper_v014::header::CONTENT_TYPE)
1346      .map(compressible::is_content_compressible)
1347      .unwrap_or_default()
1348}
1349
1350#[op2]
1351async fn op_http_write_resource(
1352  state: Rc<RefCell<OpState>>,
1353  #[smi] rid: ResourceId,
1354  #[smi] stream: ResourceId,
1355) -> Result<(), HttpError> {
1356  let http_stream = state
1357    .borrow()
1358    .resource_table
1359    .get::<HttpStreamWriteResource>(rid)?;
1360  let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
1361  let resource = state.borrow().resource_table.get_any(stream)?;
1362  loop {
1363    match *wr {
1364      HttpResponseWriter::Headers(_) => {
1365        return Err(HttpError::NoResponseHeaders);
1366      }
1367      HttpResponseWriter::Closed => {
1368        return Err(HttpError::ResponseAlreadyCompleted);
1369      }
1370      _ => {}
1371    };
1372
1373    let view = resource.clone().read(64 * 1024).await?; // 64KB
1374    if view.is_empty() {
1375      break;
1376    }
1377
1378    match &mut *wr {
1379      HttpResponseWriter::Body { writer, .. } => {
1380        let mut result = writer.write_all(&view).await;
1381        if result.is_ok() {
1382          result = writer.flush().await;
1383        }
1384        if let Err(err) = result {
1385          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1386          // Don't return "broken pipe", that's an implementation detail.
1387          // Pull up the failure associated with the transport connection instead.
1388          http_stream.conn.closed().await?;
1389          // If there was no connection error, drop body_tx.
1390          *wr = HttpResponseWriter::Closed;
1391        }
1392      }
1393      HttpResponseWriter::BodyUncompressed(body) => {
1394        let bytes = view.to_vec().into();
1395        if let Err(err) = body.sender().send_data(bytes).await {
1396          assert!(err.is_closed());
1397          // Pull up the failure associated with the transport connection instead.
1398          http_stream.conn.closed().await?;
1399          // If there was no connection error, drop body_tx.
1400          *wr = HttpResponseWriter::Closed;
1401        }
1402      }
1403      _ => unreachable!(),
1404    };
1405  }
1406  Ok(())
1407}
1408
1409#[op2]
1410async fn op_http_write(
1411  state: Rc<RefCell<OpState>>,
1412  #[smi] rid: ResourceId,
1413  #[buffer] buf: JsBuffer,
1414) -> Result<(), HttpError> {
1415  let stream = state
1416    .borrow()
1417    .resource_table
1418    .get::<HttpStreamWriteResource>(rid)?;
1419  let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1420
1421  if let Some(otel) = stream.otel_info.as_ref() {
1422    let mut maybe_otel_info = otel.borrow_mut();
1423    if let Some(otel_info) = maybe_otel_info.as_mut()
1424      && let Some(response_size) = otel_info.response_size.as_mut()
1425    {
1426      *response_size += buf.len() as u64;
1427    }
1428  }
1429
1430  match &mut *wr {
1431    HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
1432    HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
1433    HttpResponseWriter::Body { writer, .. } => {
1434      let mut result = writer.write_all(&buf).await;
1435      if result.is_ok() {
1436        result = writer.flush().await;
1437      }
1438      match result {
1439        Ok(_) => Ok(()),
1440        Err(err) => {
1441          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1442          // Don't return "broken pipe", that's an implementation detail.
1443          // Pull up the failure associated with the transport connection instead.
1444          stream.conn.closed().await?;
1445          // If there was no connection error, drop body_tx.
1446          *wr = HttpResponseWriter::Closed;
1447          Err(HttpError::ResponseAlreadyCompleted)
1448        }
1449      }
1450    }
1451    HttpResponseWriter::BodyUncompressed(body) => {
1452      let bytes = Bytes::from(buf.to_vec());
1453      match body.sender().send_data(bytes).await {
1454        Ok(_) => Ok(()),
1455        Err(err) => {
1456          assert!(err.is_closed());
1457          // Pull up the failure associated with the transport connection instead.
1458          stream.conn.closed().await?;
1459          // If there was no connection error, drop body_tx.
1460          *wr = HttpResponseWriter::Closed;
1461          Err(HttpError::ResponseAlreadyCompleted)
1462        }
1463      }
1464    }
1465  }
1466}
1467
1468/// Gracefully closes the write half of the HTTP stream. Note that this does not
1469/// remove the HTTP stream resource from the resource table; it still has to be
1470/// closed with `Deno.core.close()`.
1471#[op2]
1472async fn op_http_shutdown(
1473  state: Rc<RefCell<OpState>>,
1474  #[smi] rid: ResourceId,
1475) -> Result<(), HttpError> {
1476  let stream = state
1477    .borrow()
1478    .resource_table
1479    .get::<HttpStreamWriteResource>(rid)?;
1480  let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1481  let wr = take(&mut *wr);
1482  match wr {
1483    HttpResponseWriter::Body {
1484      mut writer,
1485      shutdown_handle,
1486    } => {
1487      shutdown_handle.shutdown();
1488      match writer.shutdown().await {
1489        Ok(_) => {}
1490        Err(err) => {
1491          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1492          // Don't return "broken pipe", that's an implementation detail.
1493          // Pull up the failure associated with the transport connection instead.
1494          stream.conn.closed().await?;
1495        }
1496      }
1497    }
1498    HttpResponseWriter::BodyUncompressed(body) => {
1499      body.shutdown();
1500    }
1501    _ => {}
1502  };
1503  Ok(())
1504}
1505
1506#[op2]
1507#[string]
1508fn op_http_websocket_accept_header(#[string] key: String) -> String {
1509  let digest = aws_lc_rs::digest::digest(
1510    &aws_lc_rs::digest::SHA1_FOR_LEGACY_USE_ONLY,
1511    format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
1512  );
1513  BASE64_STANDARD.encode(digest)
1514}
1515
1516#[op2]
1517#[smi]
1518async fn op_http_upgrade_websocket(
1519  state: Rc<RefCell<OpState>>,
1520  #[smi] rid: ResourceId,
1521) -> Result<ResourceId, HttpError> {
1522  let stream = state
1523    .borrow_mut()
1524    .resource_table
1525    .get::<HttpStreamReadResource>(rid)?;
1526  let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
1527
1528  let request = match &mut *rd {
1529    HttpRequestReader::Headers(request) => request,
1530    _ => {
1531      return Err(HttpError::UpgradeBodyUsed)
1532        .inspect_err(|e| handle_error_otel(&stream.otel_info, e));
1533    }
1534  };
1535
1536  let (transport, bytes) = extract_network_stream(
1537    hyper_v014::upgrade::on(request)
1538      .await
1539      .map_err(|err| HttpError::HyperV014(Arc::new(err)))
1540      .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?,
1541  );
1542  Ok(ws_create_server_stream(
1543    &mut state.borrow_mut(),
1544    transport,
1545    bytes,
1546  ))
1547}
1548
1549// Needed so hyper can use non Send futures
1550#[derive(Clone)]
1551pub struct LocalExecutor;
1552
1553impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
1554where
1555  Fut: Future + 'static,
1556  Fut::Output: 'static,
1557{
1558  fn execute(&self, fut: Fut) {
1559    deno_core::unsync::spawn(fut);
1560  }
1561}
1562
1563impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
1564where
1565  Fut: Future + 'static,
1566  Fut::Output: 'static,
1567{
1568  fn execute(&self, fut: Fut) {
1569    deno_core::unsync::spawn(fut);
1570  }
1571}
1572
1573/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
1574fn filter_enotconn(
1575  result: Result<(), hyper_v014::Error>,
1576) -> Result<(), hyper_v014::Error> {
1577  if result
1578    .as_ref()
1579    .err()
1580    .and_then(|err| err.source())
1581    .and_then(|err| err.downcast_ref::<io::Error>())
1582    .filter(|err| err.kind() == io::ErrorKind::NotConnected)
1583    .is_some()
1584  {
1585    Ok(())
1586  } else {
1587    result
1588  }
1589}
1590
1591/// Create a future that is forever pending.
1592fn never() -> Pending<Never> {
1593  pending()
1594}
1595
1596trait CanDowncastUpgrade: Sized {
1597  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1598    self,
1599  ) -> Result<(T, Bytes), Self>;
1600}
1601
1602impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
1603  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1604    self,
1605  ) -> Result<(T, Bytes), Self> {
1606    let hyper::upgrade::Parts { io, read_buf, .. } =
1607      self.downcast::<TokioIo<T>>()?;
1608    Ok((io.into_inner(), read_buf))
1609  }
1610}
1611
1612impl CanDowncastUpgrade for hyper_v014::upgrade::Upgraded {
1613  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1614    self,
1615  ) -> Result<(T, Bytes), Self> {
1616    let hyper_v014::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
1617    Ok((io, read_buf))
1618  }
1619}
1620
1621fn maybe_extract_network_stream<
1622  T: Into<NetworkStream> + AsyncRead + AsyncWrite + Unpin + 'static,
1623  U: CanDowncastUpgrade,
1624>(
1625  upgraded: U,
1626) -> Result<(NetworkStream, Bytes), U> {
1627  let upgraded = match upgraded.downcast::<T>() {
1628    Ok((stream, bytes)) => return Ok((stream.into(), bytes)),
1629    Err(x) => x,
1630  };
1631
1632  match upgraded.downcast::<NetworkBufferedStream<T>>() {
1633    Ok((stream, upgraded_bytes)) => {
1634      // Both the upgrade and the stream might have unread bytes
1635      let (io, stream_bytes) = stream.into_inner();
1636      let bytes = match (stream_bytes.is_empty(), upgraded_bytes.is_empty()) {
1637        (false, false) => Bytes::default(),
1638        (true, false) => upgraded_bytes,
1639        (false, true) => stream_bytes,
1640        (true, true) => {
1641          // The upgraded bytes come first as they have already been read
1642          let mut v = upgraded_bytes.to_vec();
1643          v.append(&mut stream_bytes.to_vec());
1644          Bytes::from(v)
1645        }
1646      };
1647      Ok((io.into(), bytes))
1648    }
1649    Err(x) => Err(x),
1650  }
1651}
1652
1653fn extract_network_stream<U: CanDowncastUpgrade>(
1654  upgraded: U,
1655) -> (NetworkStream, Bytes) {
1656  let upgraded =
1657    match maybe_extract_network_stream::<tokio::net::TcpStream, _>(upgraded) {
1658      Ok(res) => return res,
1659      Err(x) => x,
1660    };
1661  let upgraded = match maybe_extract_network_stream::<
1662    deno_net::ops_tls::TlsStream<TcpStream>,
1663    _,
1664  >(upgraded)
1665  {
1666    Ok(res) => return res,
1667    Err(x) => x,
1668  };
1669  #[cfg(unix)]
1670  let upgraded =
1671    match maybe_extract_network_stream::<tokio::net::UnixStream, _>(upgraded) {
1672      Ok(res) => return res,
1673      Err(x) => x,
1674    };
1675  #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))]
1676  let upgraded =
1677    match maybe_extract_network_stream::<tokio_vsock::VsockStream, _>(upgraded)
1678    {
1679      Ok(res) => return res,
1680      Err(x) => x,
1681    };
1682  let upgraded =
1683    match maybe_extract_network_stream::<deno_net::tunnel::TunnelStream, _>(
1684      upgraded,
1685    ) {
1686      Ok(res) => return res,
1687      Err(x) => x,
1688    };
1689  let upgraded =
1690    match maybe_extract_network_stream::<NetworkStream, _>(upgraded) {
1691      Ok(res) => return res,
1692      Err(x) => x,
1693    };
1694
1695  // TODO(mmastrac): HTTP/2 websockets may yield an un-downgradable type
1696  drop(upgraded);
1697  unreachable!("unexpected stream type");
1698}
1699
1700#[op2]
1701pub fn op_http_serve_address_override() -> (u8, String, u32, bool) {
1702  if let Ok(val) = std::env::var("DENO_SERVE_ADDRESS") {
1703    return parse_serve_address(&val);
1704  };
1705
1706  if deno_net::tunnel::get_tunnel().is_some() {
1707    return (4, String::new(), 0, true);
1708  }
1709
1710  (0, String::new(), 0, false)
1711}
1712
1713fn parse_serve_address(input: &str) -> (u8, String, u32, bool) {
1714  let (input, duplicate) = match input.strip_prefix("duplicate,") {
1715    Some(input) => (input, true),
1716    None => (input, false),
1717  };
1718  match input.split_once(':') {
1719    Some(("tcp", addr)) => {
1720      // TCP address
1721      match addr.parse::<SocketAddr>() {
1722        Ok(addr) => {
1723          let hostname = match addr {
1724            SocketAddr::V4(v4) => v4.ip().to_string(),
1725            SocketAddr::V6(v6) => format!("[{}]", v6.ip()),
1726          };
1727          (1, hostname, addr.port() as u32, duplicate)
1728        }
1729        Err(_) => {
1730          log::error!("DENO_SERVE_ADDRESS: invalid TCP address: {}", addr);
1731          (0, String::new(), 0, false)
1732        }
1733      }
1734    }
1735    Some(("unix", addr)) => {
1736      // Unix socket path
1737      if addr.is_empty() {
1738        log::error!("DENO_SERVE_ADDRESS: empty unix socket path");
1739        return (0, String::new(), 0, duplicate);
1740      }
1741      (2, addr.to_string(), 0, duplicate)
1742    }
1743    Some(("vsock", addr)) => {
1744      // Vsock address
1745      match addr.split_once(':') {
1746        Some((cid, port)) => {
1747          let cid = if cid == "-1" {
1748            "-1".to_string()
1749          } else {
1750            match cid.parse::<u32>() {
1751              Ok(cid) => cid.to_string(),
1752              Err(_) => {
1753                log::error!("DENO_SERVE_ADDRESS: invalid vsock CID: {}", cid);
1754                return (0, String::new(), 0, false);
1755              }
1756            }
1757          };
1758          let port = match port.parse::<u32>() {
1759            Ok(port) => port,
1760            Err(_) => {
1761              log::error!("DENO_SERVE_ADDRESS: invalid vsock port: {}", port);
1762              return (0, String::new(), 0, false);
1763            }
1764          };
1765          (3, cid, port, duplicate)
1766        }
1767        None => (0, String::new(), 0, false),
1768      }
1769    }
1770    Some(("tunnel", _)) => (4, String::new(), 0, duplicate),
1771    Some((_, _)) | None => {
1772      log::error!("DENO_SERVE_ADDRESS: invalid address format: {}", input);
1773      (0, String::new(), 0, false)
1774    }
1775  }
1776}
1777
1778pub static SERVE_NOTIFIER: Notify = Notify::const_new();
1779
1780#[op2(fast)]
1781fn op_http_notify_serving() {
1782  static ONCE: AtomicBool = AtomicBool::new(false);
1783
1784  if ONCE
1785    .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
1786    .is_ok()
1787  {
1788    SERVE_NOTIFIER.notify_one();
1789  }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794  use super::*;
1795
1796  #[test]
1797  fn test_parse_serve_address() {
1798    assert_eq!(
1799      parse_serve_address("tcp:127.0.0.1:8080"),
1800      (1, "127.0.0.1".to_string(), 8080, false)
1801    );
1802    assert_eq!(
1803      parse_serve_address("tcp:[::1]:9000"),
1804      (1, "[::1]".to_string(), 9000, false)
1805    );
1806    assert_eq!(
1807      parse_serve_address("duplicate,tcp:[::1]:9000"),
1808      (1, "[::1]".to_string(), 9000, true)
1809    );
1810
1811    assert_eq!(
1812      parse_serve_address("unix:/var/run/socket.sock"),
1813      (2, "/var/run/socket.sock".to_string(), 0, false)
1814    );
1815    assert_eq!(
1816      parse_serve_address("duplicate,unix:/var/run/socket.sock"),
1817      (2, "/var/run/socket.sock".to_string(), 0, true)
1818    );
1819
1820    assert_eq!(
1821      parse_serve_address("vsock:1234:5678"),
1822      (3, "1234".to_string(), 5678, false)
1823    );
1824    assert_eq!(
1825      parse_serve_address("vsock:-1:5678"),
1826      (3, "-1".to_string(), 5678, false)
1827    );
1828    assert_eq!(
1829      parse_serve_address("duplicate,vsock:-1:5678"),
1830      (3, "-1".to_string(), 5678, true)
1831    );
1832
1833    assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0, false));
1834    assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0, false));
1835    assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0, false));
1836    assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0, false));
1837    assert_eq!(parse_serve_address("bar"), (0, String::new(), 0, false));
1838  }
1839}