deno_http/
lib.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3use std::borrow::Cow;
4use std::cell::RefCell;
5use std::cmp::min;
6use std::error::Error;
7use std::future::pending;
8use std::future::Future;
9use std::future::Pending;
10use std::io;
11use std::io::Write;
12use std::mem::replace;
13use std::mem::take;
14use std::pin::pin;
15use std::pin::Pin;
16use std::rc::Rc;
17use std::sync::Arc;
18use std::task::ready;
19use std::task::Context;
20use std::task::Poll;
21
22use async_compression::tokio::write::BrotliEncoder;
23use async_compression::tokio::write::GzipEncoder;
24use async_compression::Level;
25use base64::prelude::BASE64_STANDARD;
26use base64::Engine;
27use cache_control::CacheControl;
28use deno_core::futures::channel::mpsc;
29use deno_core::futures::channel::oneshot;
30use deno_core::futures::future::select;
31use deno_core::futures::future::Either;
32use deno_core::futures::future::RemoteHandle;
33use deno_core::futures::future::Shared;
34use deno_core::futures::never::Never;
35use deno_core::futures::stream::Peekable;
36use deno_core::futures::FutureExt;
37use deno_core::futures::StreamExt;
38use deno_core::futures::TryFutureExt;
39use deno_core::op2;
40use deno_core::unsync::spawn;
41use deno_core::AsyncRefCell;
42use deno_core::AsyncResult;
43use deno_core::BufView;
44use deno_core::ByteString;
45use deno_core::CancelFuture;
46use deno_core::CancelHandle;
47use deno_core::CancelTryFuture;
48use deno_core::JsBuffer;
49use deno_core::OpState;
50use deno_core::RcRef;
51use deno_core::Resource;
52use deno_core::ResourceId;
53use deno_core::StringOrBuffer;
54use deno_error::JsErrorBox;
55use deno_net::raw::NetworkStream;
56use deno_telemetry::Histogram;
57use deno_telemetry::MeterProvider;
58use deno_telemetry::UpDownCounter;
59use deno_telemetry::OTEL_GLOBALS;
60use deno_websocket::ws_create_server_stream;
61use flate2::write::GzEncoder;
62use flate2::Compression;
63use hyper::server::conn::http1;
64use hyper::server::conn::http2;
65use hyper_util::rt::TokioIo;
66use hyper_v014::body::Bytes;
67use hyper_v014::body::HttpBody;
68use hyper_v014::body::SizeHint;
69use hyper_v014::header::HeaderName;
70use hyper_v014::header::HeaderValue;
71use hyper_v014::server::conn::Http;
72use hyper_v014::service::Service;
73use hyper_v014::Body;
74use hyper_v014::HeaderMap;
75use hyper_v014::Request;
76use hyper_v014::Response;
77use once_cell::sync::OnceCell;
78use serde::Serialize;
79use tokio::io::AsyncRead;
80use tokio::io::AsyncWrite;
81use tokio::io::AsyncWriteExt;
82
83use crate::network_buffered_stream::NetworkBufferedStream;
84use crate::reader_stream::ExternallyAbortableReaderStream;
85use crate::reader_stream::ShutdownHandle;
86
87pub mod compressible;
88mod fly_accept_encoding;
89mod http_next;
90mod network_buffered_stream;
91mod reader_stream;
92mod request_body;
93mod request_properties;
94mod response_body;
95mod service;
96mod websocket_upgrade;
97
98use fly_accept_encoding::Encoding;
99pub use http_next::HttpNextError;
100pub use request_properties::DefaultHttpPropertyExtractor;
101pub use request_properties::HttpConnectionProperties;
102pub use request_properties::HttpListenProperties;
103pub use request_properties::HttpPropertyExtractor;
104pub use request_properties::HttpRequestProperties;
105pub use service::UpgradeUnavailableError;
106pub use websocket_upgrade::WebSocketUpgradeError;
107
108struct OtelCollectors {
109  duration: Histogram<f64>,
110  active_requests: UpDownCounter<i64>,
111  request_size: Histogram<u64>,
112  response_size: Histogram<u64>,
113}
114
115static OTEL_COLLECTORS: OnceCell<OtelCollectors> = OnceCell::new();
116
117#[derive(Debug, Default, Clone, Copy)]
118pub struct Options {
119  /// By passing a hook function, the caller can customize various configuration
120  /// options for the HTTP/2 server.
121  /// See [`http2::Builder`] for what parameters can be customized.
122  ///
123  /// If `None`, the default configuration provided by hyper will be used. Note
124  /// that the default configuration is subject to change in future versions.
125  pub http2_builder_hook:
126    Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
127  /// By passing a hook function, the caller can customize various configuration
128  /// options for the HTTP/1 server.
129  /// See [`http1::Builder`] for what parameters can be customized.
130  ///
131  /// If `None`, the default configuration provided by hyper will be used. Note
132  /// that the default configuration is subject to change in future versions.
133  pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
134
135  /// If `false`, the server will abort the request when the response is dropped.
136  pub no_legacy_abort: bool,
137}
138
139#[cfg(not(feature = "default_property_extractor"))]
140deno_core::extension!(
141  deno_http,
142  deps = [deno_web, deno_net, deno_fetch, deno_websocket],
143  parameters = [ HTTP: HttpPropertyExtractor ],
144  ops = [
145    op_http_accept,
146    op_http_headers,
147    op_http_shutdown,
148    op_http_upgrade_websocket,
149    op_http_websocket_accept_header,
150    op_http_write_headers,
151    op_http_write_resource,
152    op_http_write,
153    http_next::op_http_close_after_finish,
154    http_next::op_http_get_request_header,
155    http_next::op_http_get_request_headers,
156    http_next::op_http_request_on_cancel,
157    http_next::op_http_get_request_method_and_url<HTTP>,
158    http_next::op_http_get_request_cancelled,
159    http_next::op_http_read_request_body,
160    http_next::op_http_serve_on<HTTP>,
161    http_next::op_http_serve<HTTP>,
162    http_next::op_http_set_promise_complete,
163    http_next::op_http_set_response_body_bytes,
164    http_next::op_http_set_response_body_resource,
165    http_next::op_http_set_response_body_text,
166    http_next::op_http_set_response_header,
167    http_next::op_http_set_response_headers,
168    http_next::op_http_set_response_trailers,
169    http_next::op_http_upgrade_websocket_next,
170    http_next::op_http_upgrade_raw,
171    http_next::op_raw_write_vectored,
172    http_next::op_can_write_vectored,
173    http_next::op_http_try_wait,
174    http_next::op_http_wait,
175    http_next::op_http_close,
176    http_next::op_http_cancel,
177    http_next::op_http_metric_handle_otel_error,
178  ],
179  esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
180  options = {
181    options: Options,
182  },
183  state = |state, options| {
184    state.put::<Options>(options.options);
185  }
186);
187
188#[cfg(feature = "default_property_extractor")]
189deno_core::extension!(
190  deno_http,
191  deps = [deno_web, deno_net, deno_fetch, deno_websocket],
192  ops = [
193    op_http_accept,
194    op_http_headers,
195    op_http_shutdown,
196    op_http_upgrade_websocket,
197    op_http_websocket_accept_header,
198    op_http_write_headers,
199    op_http_write_resource,
200    op_http_write,
201    http_next::op_http_close_after_finish,
202    http_next::op_http_get_request_header,
203    http_next::op_http_get_request_headers,
204    http_next::op_http_request_on_cancel,
205    http_next::op_http_get_request_method_and_url<DefaultHttpPropertyExtractor>,
206    http_next::op_http_get_request_cancelled,
207    http_next::op_http_read_request_body,
208    http_next::op_http_serve_on<DefaultHttpPropertyExtractor>,
209    http_next::op_http_serve<DefaultHttpPropertyExtractor>,
210    http_next::op_http_set_promise_complete,
211    http_next::op_http_set_response_body_bytes,
212    http_next::op_http_set_response_body_resource,
213    http_next::op_http_set_response_body_text,
214    http_next::op_http_set_response_header,
215    http_next::op_http_set_response_headers,
216    http_next::op_http_set_response_trailers,
217    http_next::op_http_upgrade_websocket_next,
218    http_next::op_http_upgrade_raw,
219    http_next::op_raw_write_vectored,
220    http_next::op_can_write_vectored,
221    http_next::op_http_try_wait,
222    http_next::op_http_wait,
223    http_next::op_http_close,
224    http_next::op_http_cancel,
225    http_next::op_http_metric_handle_otel_error,
226  ],
227  esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
228  options = {
229    options: Options,
230  },
231  state = |state, options| {
232    state.put::<Options>(options.options);
233  }
234);
235
236#[derive(Debug, thiserror::Error, deno_error::JsError)]
237pub enum HttpError {
238  #[class(inherit)]
239  #[error(transparent)]
240  Resource(#[from] deno_core::error::ResourceError),
241  #[class(inherit)]
242  #[error(transparent)]
243  Canceled(#[from] deno_core::Canceled),
244  #[class("Http")]
245  #[error("{0}")]
246  HyperV014(#[source] Arc<hyper_v014::Error>),
247  #[class(generic)]
248  #[error("{0}")]
249  InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
250  #[class(generic)]
251  #[error("{0}")]
252  InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
253  #[class(generic)]
254  #[error("{0}")]
255  Http(#[from] hyper_v014::http::Error),
256  #[class("Http")]
257  #[error("response headers already sent")]
258  ResponseHeadersAlreadySent,
259  #[class("Http")]
260  #[error("connection closed while sending response")]
261  ConnectionClosedWhileSendingResponse,
262  #[class("Http")]
263  #[error("already in use")]
264  AlreadyInUse,
265  #[class(inherit)]
266  #[error("{0}")]
267  Io(#[from] std::io::Error),
268  #[class("Http")]
269  #[error("no response headers")]
270  NoResponseHeaders,
271  #[class("Http")]
272  #[error("response already completed")]
273  ResponseAlreadyCompleted,
274  #[class("Http")]
275  #[error("cannot upgrade because request body was used")]
276  UpgradeBodyUsed,
277  #[class("Http")]
278  #[error(transparent)]
279  Other(#[from] JsErrorBox),
280}
281
282pub enum HttpSocketAddr {
283  IpSocket(std::net::SocketAddr),
284  #[cfg(unix)]
285  UnixSocket(tokio::net::unix::SocketAddr),
286}
287
288impl From<std::net::SocketAddr> for HttpSocketAddr {
289  fn from(addr: std::net::SocketAddr) -> Self {
290    Self::IpSocket(addr)
291  }
292}
293
294#[cfg(unix)]
295impl From<tokio::net::unix::SocketAddr> for HttpSocketAddr {
296  fn from(addr: tokio::net::unix::SocketAddr) -> Self {
297    Self::UnixSocket(addr)
298  }
299}
300
301struct OtelInfo {
302  attributes: OtelInfoAttributes,
303  duration: Option<std::time::Instant>,
304  request_size: Option<u64>,
305  response_size: Option<u64>,
306}
307
308struct OtelInfoAttributes {
309  http_request_method: Cow<'static, str>,
310  network_protocol_version: &'static str,
311  url_scheme: Cow<'static, str>,
312  server_address: Option<String>,
313  server_port: Option<i64>,
314  error_type: Option<&'static str>,
315  http_response_status_code: Option<i64>,
316}
317
318impl OtelInfoAttributes {
319  fn method(method: &http::method::Method) -> Cow<'static, str> {
320    use http::method::Method;
321
322    match *method {
323      Method::GET => Cow::Borrowed("GET"),
324      Method::POST => Cow::Borrowed("POST"),
325      Method::PUT => Cow::Borrowed("PUT"),
326      Method::DELETE => Cow::Borrowed("DELETE"),
327      Method::HEAD => Cow::Borrowed("HEAD"),
328      Method::OPTIONS => Cow::Borrowed("OPTIONS"),
329      Method::CONNECT => Cow::Borrowed("CONNECT"),
330      Method::PATCH => Cow::Borrowed("PATCH"),
331      Method::TRACE => Cow::Borrowed("TRACE"),
332      _ => Cow::Owned(method.to_string()),
333    }
334  }
335
336  fn method_v02(method: &http_v02::method::Method) -> Cow<'static, str> {
337    use http_v02::method::Method;
338
339    match *method {
340      Method::GET => Cow::Borrowed("GET"),
341      Method::POST => Cow::Borrowed("POST"),
342      Method::PUT => Cow::Borrowed("PUT"),
343      Method::DELETE => Cow::Borrowed("DELETE"),
344      Method::HEAD => Cow::Borrowed("HEAD"),
345      Method::OPTIONS => Cow::Borrowed("OPTIONS"),
346      Method::CONNECT => Cow::Borrowed("CONNECT"),
347      Method::PATCH => Cow::Borrowed("PATCH"),
348      Method::TRACE => Cow::Borrowed("TRACE"),
349      _ => Cow::Owned(method.to_string()),
350    }
351  }
352
353  fn version(version: http::Version) -> &'static str {
354    use http::Version;
355
356    match version {
357      Version::HTTP_09 => "0.9",
358      Version::HTTP_10 => "1.0",
359      Version::HTTP_11 => "1.1",
360      Version::HTTP_2 => "2",
361      Version::HTTP_3 => "3",
362      _ => unreachable!(),
363    }
364  }
365
366  fn version_v02(version: http_v02::Version) -> &'static str {
367    use http_v02::Version;
368
369    match version {
370      Version::HTTP_09 => "0.9",
371      Version::HTTP_10 => "1.0",
372      Version::HTTP_11 => "1.1",
373      Version::HTTP_2 => "2",
374      Version::HTTP_3 => "3",
375      _ => unreachable!(),
376    }
377  }
378
379  fn for_counter(&self) -> Vec<deno_telemetry::KeyValue> {
380    let mut attributes = vec![
381      deno_telemetry::KeyValue::new(
382        "http.request.method",
383        self.http_request_method.clone(),
384      ),
385      deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
386    ];
387
388    if let Some(address) = self.server_address.clone() {
389      attributes.push(deno_telemetry::KeyValue::new("server.address", address));
390    }
391    if let Some(port) = self.server_port {
392      attributes.push(deno_telemetry::KeyValue::new("server.port", port));
393    }
394
395    attributes
396  }
397
398  fn for_histogram(&self) -> Vec<deno_telemetry::KeyValue> {
399    let mut histogram_attributes = vec![
400      deno_telemetry::KeyValue::new(
401        "http.request.method",
402        self.http_request_method.clone(),
403      ),
404      deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
405      deno_telemetry::KeyValue::new(
406        "network.protocol.version",
407        self.network_protocol_version,
408      ),
409    ];
410
411    if let Some(address) = self.server_address.clone() {
412      histogram_attributes
413        .push(deno_telemetry::KeyValue::new("server.address", address));
414    }
415    if let Some(port) = self.server_port {
416      histogram_attributes
417        .push(deno_telemetry::KeyValue::new("server.port", port));
418    }
419    if let Some(status_code) = self.http_response_status_code {
420      histogram_attributes.push(deno_telemetry::KeyValue::new(
421        "http.response.status_code",
422        status_code,
423      ));
424    }
425
426    if let Some(error) = self.error_type {
427      histogram_attributes
428        .push(deno_telemetry::KeyValue::new("error.type", error));
429    }
430
431    histogram_attributes
432  }
433}
434
435impl OtelInfo {
436  fn new(
437    otel: &deno_telemetry::OtelGlobals,
438    instant: std::time::Instant,
439    request_size: u64,
440    attributes: OtelInfoAttributes,
441  ) -> Self {
442    let collectors = OTEL_COLLECTORS.get_or_init(|| {
443      let meter = otel
444        .meter_provider
445        .meter_with_scope(otel.builtin_instrumentation_scope.clone());
446
447      let duration = meter
448        .f64_histogram("http.server.request.duration")
449        .with_unit("s")
450        .with_description("Duration of HTTP server requests.")
451        .with_boundaries(vec![
452          0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0,
453          7.5, 10.0,
454        ])
455        .build();
456
457      let active_requests = meter
458        .i64_up_down_counter("http.server.active_requests")
459        .with_unit("{request}")
460        .with_description("Number of active HTTP server requests.")
461        .build();
462
463      let request_size = meter
464        .u64_histogram("http.server.request.body.size")
465        .with_unit("By")
466        .with_description("Size of HTTP server request bodies.")
467        .with_boundaries(vec![
468          0.0,
469          100.0,
470          1000.0,
471          10000.0,
472          100000.0,
473          1000000.0,
474          10000000.0,
475          100000000.0,
476          1000000000.0,
477        ])
478        .build();
479
480      let response_size = meter
481        .u64_histogram("http.server.response.body.size")
482        .with_unit("By")
483        .with_description("Size of HTTP server response bodies.")
484        .with_boundaries(vec![
485          0.0,
486          100.0,
487          1000.0,
488          10000.0,
489          100000.0,
490          1000000.0,
491          10000000.0,
492          100000000.0,
493          1000000000.0,
494        ])
495        .build();
496
497      OtelCollectors {
498        duration,
499        active_requests,
500        request_size,
501        response_size,
502      }
503    });
504
505    collectors.active_requests.add(1, &attributes.for_counter());
506
507    Self {
508      attributes,
509      duration: Some(instant),
510      request_size: Some(request_size),
511      response_size: Some(0),
512    }
513  }
514
515  fn handle_duration_and_request_size(&mut self) {
516    let collectors = OTEL_COLLECTORS.get().unwrap();
517    let attributes = self.attributes.for_histogram();
518
519    if let Some(duration) = self.duration.take() {
520      let duration = duration.elapsed();
521      collectors
522        .duration
523        .record(duration.as_secs_f64(), &attributes);
524    }
525
526    if let Some(request_size) = self.request_size.take() {
527      let collectors = OTEL_COLLECTORS.get().unwrap();
528      collectors.request_size.record(request_size, &attributes);
529    }
530  }
531}
532
533impl Drop for OtelInfo {
534  fn drop(&mut self) {
535    let collectors = OTEL_COLLECTORS.get().unwrap();
536
537    self.handle_duration_and_request_size();
538
539    collectors
540      .active_requests
541      .add(-1, &self.attributes.for_counter());
542
543    if let Some(response_size) = self.response_size {
544      collectors
545        .response_size
546        .record(response_size, &self.attributes.for_histogram());
547    }
548  }
549}
550
551fn handle_error_otel(
552  otel: &Option<Rc<RefCell<Option<OtelInfo>>>>,
553  error: &HttpError,
554) {
555  if let Some(otel) = otel.as_ref() {
556    let mut maybe_otel_info = otel.borrow_mut();
557    if let Some(otel_info) = maybe_otel_info.as_mut() {
558      otel_info.attributes.error_type = Some(match error {
559        HttpError::Resource(_) => "resource",
560        HttpError::Canceled(_) => "canceled",
561        HttpError::HyperV014(_) => "hyper",
562        HttpError::InvalidHeaderName(_) => "invalid header name",
563        HttpError::InvalidHeaderValue(_) => "invalid header value",
564        HttpError::Http(_) => "http",
565        HttpError::ResponseHeadersAlreadySent => {
566          "response headers already sent"
567        }
568        HttpError::ConnectionClosedWhileSendingResponse => {
569          "connection closed while sending response"
570        }
571        HttpError::AlreadyInUse => "already in use",
572        HttpError::Io(_) => "io",
573        HttpError::NoResponseHeaders => "no response headers",
574        HttpError::ResponseAlreadyCompleted => "response already completed",
575        HttpError::UpgradeBodyUsed => "upgrade body used",
576        HttpError::Other(_) => "unknown",
577      });
578    }
579  }
580}
581
582struct HttpConnResource {
583  addr: HttpSocketAddr,
584  scheme: &'static str,
585  acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
586  closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper_v014::Error>>>>,
587  cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops.
588}
589
590impl HttpConnResource {
591  fn new<S>(io: S, scheme: &'static str, addr: HttpSocketAddr) -> Self
592  where
593    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
594  {
595    let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
596    let service = HttpService::new(acceptors_rx);
597
598    let conn_fut = Http::new()
599      .with_executor(LocalExecutor)
600      .serve_connection(io, service)
601      .with_upgrades();
602
603    // When the cancel handle is used, the connection shuts down gracefully.
604    // No new HTTP streams will be accepted, but existing streams will be able
605    // to continue operating and eventually shut down cleanly.
606    let cancel_handle = CancelHandle::new_rc();
607    let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
608
609    // A local task that polls the hyper connection future to completion.
610    let task_fut = async move {
611      let conn_fut = pin!(conn_fut);
612      let shutdown_fut = pin!(shutdown_fut);
613      let result = match select(conn_fut, shutdown_fut).await {
614        Either::Left((result, _)) => result,
615        Either::Right((_, mut conn_fut)) => {
616          conn_fut.as_mut().graceful_shutdown();
617          conn_fut.await
618        }
619      };
620      filter_enotconn(result).map_err(Arc::from)
621    };
622    let (task_fut, closed_fut) = task_fut.remote_handle();
623    let closed_fut = closed_fut.shared();
624    spawn(task_fut);
625
626    Self {
627      addr,
628      scheme,
629      acceptors_tx,
630      closed_fut,
631      cancel_handle,
632    }
633  }
634
635  // Accepts a new incoming HTTP request.
636  async fn accept(
637    self: &Rc<Self>,
638  ) -> Result<
639    Option<(
640      HttpStreamReadResource,
641      HttpStreamWriteResource,
642      String,
643      String,
644    )>,
645    HttpError,
646  > {
647    let fut = async {
648      let (request_tx, request_rx) = oneshot::channel();
649      let (response_tx, response_rx) = oneshot::channel();
650
651      let otel_instant = OTEL_GLOBALS
652        .get()
653        .filter(|o| o.has_metrics())
654        .map(|_| std::time::Instant::now());
655
656      let acceptor = HttpAcceptor::new(request_tx, response_rx);
657      self.acceptors_tx.unbounded_send(acceptor).ok()?;
658
659      let request = request_rx.await.ok()?;
660      let accept_encoding = {
661        let encodings =
662          fly_accept_encoding::encodings_iter_http_02(request.headers())
663            .filter(|r| {
664              matches!(r, Ok((Some(Encoding::Brotli | Encoding::Gzip), _)))
665            });
666
667        fly_accept_encoding::preferred(encodings)
668          .ok()
669          .flatten()
670          .unwrap_or(Encoding::Identity)
671      };
672
673      let otel_info =
674        OTEL_GLOBALS.get().filter(|o| o.has_metrics()).map(|otel| {
675          let size_hint = request.size_hint();
676          Rc::new(RefCell::new(Some(OtelInfo::new(
677            otel,
678            otel_instant.unwrap(),
679            size_hint.upper().unwrap_or(size_hint.lower()),
680            OtelInfoAttributes {
681              http_request_method: OtelInfoAttributes::method_v02(
682                request.method(),
683              ),
684              url_scheme: Cow::Borrowed(self.scheme),
685              network_protocol_version: OtelInfoAttributes::version_v02(
686                request.version(),
687              ),
688              server_address: request.uri().host().map(|host| host.to_string()),
689              server_port: request.uri().port_u16().map(|port| port as i64),
690              error_type: Default::default(),
691              http_response_status_code: Default::default(),
692            },
693          ))))
694        });
695
696      let method = request.method().to_string();
697      let url = req_url(&request, self.scheme, &self.addr);
698      let read_stream =
699        HttpStreamReadResource::new(self, request, otel_info.clone());
700      let write_stream = HttpStreamWriteResource::new(
701        self,
702        response_tx,
703        accept_encoding,
704        otel_info,
705      );
706      Some((read_stream, write_stream, method, url))
707    };
708
709    async {
710      match fut.await {
711        Some(stream) => Ok(Some(stream)),
712        // Return the connection error, if any.
713        None => self.closed().map_ok(|_| None).await,
714      }
715    }
716    .try_or_cancel(&self.cancel_handle)
717    .await
718  }
719
720  /// A future that completes when this HTTP connection is closed or errors.
721  async fn closed(&self) -> Result<(), HttpError> {
722    self.closed_fut.clone().map_err(HttpError::HyperV014).await
723  }
724}
725
726impl Resource for HttpConnResource {
727  fn name(&self) -> Cow<str> {
728    "httpConn".into()
729  }
730
731  fn close(self: Rc<Self>) {
732    self.cancel_handle.cancel();
733  }
734}
735
736/// Creates a new HttpConn resource which uses `io` as its transport.
737pub fn http_create_conn_resource<S, A>(
738  state: &mut OpState,
739  io: S,
740  addr: A,
741  scheme: &'static str,
742) -> ResourceId
743where
744  S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
745  A: Into<HttpSocketAddr>,
746{
747  let conn = HttpConnResource::new(io, scheme, addr.into());
748  state.resource_table.add(conn)
749}
750
751/// An object that implements the `hyper::Service` trait, through which Hyper
752/// delivers incoming HTTP requests.
753struct HttpService {
754  acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
755}
756
757impl HttpService {
758  fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
759    let acceptors_rx = acceptors_rx.peekable();
760    Self { acceptors_rx }
761  }
762}
763
764impl Service<Request<Body>> for HttpService {
765  type Response = Response<Body>;
766  type Error = oneshot::Canceled;
767  type Future = oneshot::Receiver<Response<Body>>;
768
769  fn poll_ready(
770    &mut self,
771    cx: &mut Context<'_>,
772  ) -> Poll<Result<(), Self::Error>> {
773    let acceptors_rx = Pin::new(&mut self.acceptors_rx);
774    let result = ready!(acceptors_rx.poll_peek(cx))
775      .map(|_| ())
776      .ok_or(oneshot::Canceled);
777    Poll::Ready(result)
778  }
779
780  fn call(&mut self, request: Request<Body>) -> Self::Future {
781    let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
782    acceptor.call(request)
783  }
784}
785
786/// A pair of one-shot channels which first transfer a HTTP request from the
787/// Hyper service to the HttpConn resource, and then take the Response back to
788/// the service.
789struct HttpAcceptor {
790  request_tx: oneshot::Sender<Request<Body>>,
791  response_rx: oneshot::Receiver<Response<Body>>,
792}
793
794impl HttpAcceptor {
795  fn new(
796    request_tx: oneshot::Sender<Request<Body>>,
797    response_rx: oneshot::Receiver<Response<Body>>,
798  ) -> Self {
799    Self {
800      request_tx,
801      response_rx,
802    }
803  }
804
805  fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
806    let Self {
807      request_tx,
808      response_rx,
809    } = self;
810    request_tx
811      .send(request)
812      .map(|_| response_rx)
813      .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver.
814  }
815}
816
817pub struct HttpStreamReadResource {
818  _conn: Rc<HttpConnResource>,
819  pub rd: AsyncRefCell<HttpRequestReader>,
820  cancel_handle: CancelHandle,
821  size: SizeHint,
822  otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
823}
824
825pub struct HttpStreamWriteResource {
826  conn: Rc<HttpConnResource>,
827  wr: AsyncRefCell<HttpResponseWriter>,
828  accept_encoding: Encoding,
829  otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
830}
831
832impl HttpStreamReadResource {
833  fn new(
834    conn: &Rc<HttpConnResource>,
835    request: Request<Body>,
836    otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
837  ) -> Self {
838    let size = request.body().size_hint();
839    Self {
840      _conn: conn.clone(),
841      rd: HttpRequestReader::Headers(request).into(),
842      size,
843      cancel_handle: CancelHandle::new(),
844      otel_info,
845    }
846  }
847}
848
849impl Resource for HttpStreamReadResource {
850  fn name(&self) -> Cow<str> {
851    "httpReadStream".into()
852  }
853
854  fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
855    Box::pin(async move {
856      let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
857
858      let body = loop {
859        match &mut *rd {
860          HttpRequestReader::Headers(_) => {}
861          HttpRequestReader::Body(_, body) => break body,
862          HttpRequestReader::Closed => return Ok(BufView::empty()),
863        }
864        match take(&mut *rd) {
865          HttpRequestReader::Headers(request) => {
866            let (parts, body) = request.into_parts();
867            *rd = HttpRequestReader::Body(parts.headers, body.peekable());
868          }
869          _ => unreachable!(),
870        };
871      };
872
873      let fut = async {
874        let mut body = Pin::new(body);
875        loop {
876          match body.as_mut().peek_mut().await {
877            Some(Ok(chunk)) if !chunk.is_empty() => {
878              let len = min(limit, chunk.len());
879              let buf = chunk.split_to(len);
880              let view = BufView::from(buf);
881              break Ok(view);
882            }
883            // This unwrap is safe because `peek_mut()` returned `Some`, and thus
884            // currently has a peeked value that can be synchronously returned
885            // from `next()`.
886            //
887            // The future returned from `next()` is always ready, so we can
888            // safely call `await` on it without creating a race condition.
889            Some(_) => match body.as_mut().next().await.unwrap() {
890              Ok(chunk) => assert!(chunk.is_empty()),
891              Err(err) => {
892                break Err(JsErrorBox::from_err(HttpError::HyperV014(
893                  Arc::new(err),
894                )))
895              }
896            },
897            None => break Ok(BufView::empty()),
898          }
899        }
900      };
901
902      let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
903      fut.try_or_cancel(cancel_handle).await
904    })
905  }
906
907  fn close(self: Rc<Self>) {
908    self.cancel_handle.cancel();
909  }
910
911  fn size_hint(&self) -> (u64, Option<u64>) {
912    (self.size.lower(), self.size.upper())
913  }
914}
915
916impl HttpStreamWriteResource {
917  fn new(
918    conn: &Rc<HttpConnResource>,
919    response_tx: oneshot::Sender<Response<Body>>,
920    accept_encoding: Encoding,
921    otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
922  ) -> Self {
923    Self {
924      conn: conn.clone(),
925      wr: HttpResponseWriter::Headers(response_tx).into(),
926      accept_encoding,
927      otel_info,
928    }
929  }
930}
931
932impl Resource for HttpStreamWriteResource {
933  fn name(&self) -> Cow<str> {
934    "httpWriteStream".into()
935  }
936}
937
938/// The read half of an HTTP stream.
939pub enum HttpRequestReader {
940  Headers(Request<Body>),
941  Body(HeaderMap<HeaderValue>, Peekable<Body>),
942  Closed,
943}
944
945impl Default for HttpRequestReader {
946  fn default() -> Self {
947    Self::Closed
948  }
949}
950
951/// The write half of an HTTP stream.
952enum HttpResponseWriter {
953  Headers(oneshot::Sender<Response<Body>>),
954  Body {
955    writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
956    shutdown_handle: ShutdownHandle,
957  },
958  BodyUncompressed(BodyUncompressedSender),
959  Closed,
960}
961
962impl Default for HttpResponseWriter {
963  fn default() -> Self {
964    Self::Closed
965  }
966}
967
968struct BodyUncompressedSender(Option<hyper_v014::body::Sender>);
969
970impl BodyUncompressedSender {
971  fn sender(&mut self) -> &mut hyper_v014::body::Sender {
972    // This is safe because we only ever take the sender out of the option
973    // inside of the shutdown method.
974    self.0.as_mut().unwrap()
975  }
976
977  fn shutdown(mut self) {
978    // take the sender out of self so that when self is dropped at the end of
979    // this block, it doesn't get aborted
980    self.0.take();
981  }
982}
983
984impl From<hyper_v014::body::Sender> for BodyUncompressedSender {
985  fn from(sender: hyper_v014::body::Sender) -> Self {
986    BodyUncompressedSender(Some(sender))
987  }
988}
989
990impl Drop for BodyUncompressedSender {
991  fn drop(&mut self) {
992    if let Some(sender) = self.0.take() {
993      sender.abort();
994    }
995  }
996}
997
998// We use a tuple instead of struct to avoid serialization overhead of the keys.
999#[derive(Serialize)]
1000#[serde(rename_all = "camelCase")]
1001struct NextRequestResponse(
1002  // read_stream_rid:
1003  ResourceId,
1004  // write_stream_rid:
1005  ResourceId,
1006  // method:
1007  // This is a String rather than a ByteString because reqwest will only return
1008  // the method as a str which is guaranteed to be ASCII-only.
1009  String,
1010  // url:
1011  String,
1012);
1013
1014#[op2(async)]
1015#[serde]
1016async fn op_http_accept(
1017  state: Rc<RefCell<OpState>>,
1018  #[smi] rid: ResourceId,
1019) -> Result<Option<NextRequestResponse>, HttpError> {
1020  let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
1021
1022  match conn.accept().await {
1023    Ok(Some((read_stream, write_stream, method, url))) => {
1024      let read_stream_rid = state
1025        .borrow_mut()
1026        .resource_table
1027        .add_rc(Rc::new(read_stream));
1028      let write_stream_rid = state
1029        .borrow_mut()
1030        .resource_table
1031        .add_rc(Rc::new(write_stream));
1032      let r =
1033        NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
1034      Ok(Some(r))
1035    }
1036    Ok(None) => Ok(None),
1037    Err(err) => Err(err),
1038  }
1039}
1040
1041fn req_url(
1042  req: &hyper_v014::Request<hyper_v014::Body>,
1043  scheme: &'static str,
1044  addr: &HttpSocketAddr,
1045) -> String {
1046  let host: Cow<str> = match addr {
1047    HttpSocketAddr::IpSocket(addr) => {
1048      if let Some(auth) = req.uri().authority() {
1049        match addr.port() {
1050          443 if scheme == "https" => Cow::Borrowed(auth.host()),
1051          80 if scheme == "http" => Cow::Borrowed(auth.host()),
1052          _ => Cow::Borrowed(auth.as_str()), // Includes port number.
1053        }
1054      } else if let Some(host) = req.uri().host() {
1055        Cow::Borrowed(host)
1056      } else if let Some(host) = req.headers().get("HOST") {
1057        match host.to_str() {
1058          Ok(host) => Cow::Borrowed(host),
1059          Err(_) => Cow::Owned(
1060            host
1061              .as_bytes()
1062              .iter()
1063              .cloned()
1064              .map(char::from)
1065              .collect::<String>(),
1066          ),
1067        }
1068      } else {
1069        Cow::Owned(addr.to_string())
1070      }
1071    }
1072    // There is no standard way for unix domain socket URLs
1073    // nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL
1074    // httpie uses http+unix://[percent_encoding_of_path]/ which we follow
1075    #[cfg(unix)]
1076    HttpSocketAddr::UnixSocket(addr) => Cow::Owned(
1077      percent_encoding::percent_encode(
1078        addr
1079          .as_pathname()
1080          .and_then(|x| x.to_str())
1081          .unwrap_or_default()
1082          .as_bytes(),
1083        percent_encoding::NON_ALPHANUMERIC,
1084      )
1085      .to_string(),
1086    ),
1087  };
1088  let path = req
1089    .uri()
1090    .path_and_query()
1091    .map(|p| p.as_str())
1092    .unwrap_or("/");
1093  [scheme, "://", &host, path].concat()
1094}
1095
1096fn req_headers(
1097  header_map: &HeaderMap<HeaderValue>,
1098) -> Vec<(ByteString, ByteString)> {
1099  // We treat cookies specially, because we don't want them to get them
1100  // mangled by the `Headers` object in JS. What we do is take all cookie
1101  // headers and concat them into a single cookie header, separated by
1102  // semicolons.
1103  let cookie_sep = "; ".as_bytes();
1104  let mut cookies = vec![];
1105
1106  let mut headers = Vec::with_capacity(header_map.len());
1107  for (name, value) in header_map.iter() {
1108    if name == hyper_v014::header::COOKIE {
1109      cookies.push(value.as_bytes());
1110    } else {
1111      let name: &[u8] = name.as_ref();
1112      let value = value.as_bytes();
1113      headers.push((name.into(), value.into()));
1114    }
1115  }
1116
1117  if !cookies.is_empty() {
1118    headers.push(("cookie".into(), cookies.join(cookie_sep).into()));
1119  }
1120
1121  headers
1122}
1123
1124#[op2(async)]
1125async fn op_http_write_headers(
1126  state: Rc<RefCell<OpState>>,
1127  #[smi] rid: u32,
1128  #[smi] status: u16,
1129  #[serde] headers: Vec<(ByteString, ByteString)>,
1130  #[serde] data: Option<StringOrBuffer>,
1131) -> Result<(), HttpError> {
1132  let stream = state
1133    .borrow_mut()
1134    .resource_table
1135    .get::<HttpStreamWriteResource>(rid)?;
1136
1137  // Track supported encoding
1138  let encoding = stream.accept_encoding;
1139
1140  let mut builder = Response::builder();
1141  // SAFETY: can not fail, since a fresh Builder is non-errored
1142  let hmap = unsafe { builder.headers_mut().unwrap_unchecked() };
1143
1144  // Add headers
1145  hmap.reserve(headers.len() + 2);
1146  for (k, v) in headers.into_iter() {
1147    let v: Vec<u8> = v.into();
1148    hmap.append(
1149      HeaderName::try_from(k.as_slice())?,
1150      HeaderValue::try_from(v)?,
1151    );
1152  }
1153  ensure_vary_accept_encoding(hmap);
1154
1155  let accepts_compression =
1156    matches!(encoding, Encoding::Brotli | Encoding::Gzip);
1157  let compressing = accepts_compression
1158    && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
1159    && should_compress(hmap);
1160
1161  if compressing {
1162    weaken_etag(hmap);
1163    // Drop 'content-length' header. Hyper will update it using compressed body.
1164    hmap.remove(hyper_v014::header::CONTENT_LENGTH);
1165    // Content-Encoding header
1166    hmap.insert(
1167      hyper_v014::header::CONTENT_ENCODING,
1168      HeaderValue::from_static(match encoding {
1169        Encoding::Brotli => "br",
1170        Encoding::Gzip => "gzip",
1171        _ => unreachable!(), // Forbidden by accepts_compression
1172      }),
1173    );
1174  }
1175
1176  let (new_wr, body) = http_response(data, compressing, encoding)?;
1177  let body = builder.status(status).body(body)?;
1178
1179  if let Some(otel) = stream.otel_info.as_ref() {
1180    let mut otel = otel.borrow_mut();
1181    if let Some(otel_info) = otel.as_mut() {
1182      otel_info.attributes.http_response_status_code = Some(status as _);
1183      otel_info.handle_duration_and_request_size();
1184    }
1185  }
1186
1187  let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1188  let response_tx = match replace(&mut *old_wr, new_wr) {
1189    HttpResponseWriter::Headers(response_tx) => response_tx,
1190    _ => return Err(HttpError::ResponseHeadersAlreadySent),
1191  };
1192
1193  match response_tx.send(body) {
1194    Ok(_) => Ok(()),
1195    Err(_) => {
1196      stream.conn.closed().await?;
1197      Err(HttpError::ConnectionClosedWhileSendingResponse)
1198    }
1199  }
1200}
1201
1202#[op2]
1203#[serde]
1204fn op_http_headers(
1205  state: &mut OpState,
1206  #[smi] rid: u32,
1207) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
1208  let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
1209  let rd = RcRef::map(&stream, |r| &r.rd)
1210    .try_borrow()
1211    .ok_or(HttpError::AlreadyInUse)
1212    .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?;
1213  match &*rd {
1214    HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
1215    HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
1216    _ => unreachable!(),
1217  }
1218}
1219
1220fn http_response(
1221  data: Option<StringOrBuffer>,
1222  compressing: bool,
1223  encoding: Encoding,
1224) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
1225  // Gzip, after level 1, doesn't produce significant size difference.
1226  // This default matches nginx default gzip compression level (1):
1227  // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
1228  const GZIP_DEFAULT_COMPRESSION_LEVEL: u8 = 1;
1229
1230  match data {
1231    Some(data) if compressing => match encoding {
1232      Encoding::Brotli => {
1233        // quality level 6 is based on google's nginx default value for
1234        // on-the-fly compression
1235        // https://github.com/google/ngx_brotli#brotli_comp_level
1236        // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
1237        // (~4MB)
1238        let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
1239        writer.write_all(&data)?;
1240        Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
1241      }
1242      Encoding::Gzip => {
1243        let mut writer = GzEncoder::new(
1244          Vec::new(),
1245          Compression::new(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1246        );
1247        writer.write_all(&data)?;
1248        Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
1249      }
1250      _ => unreachable!(), // forbidden by accepts_compression
1251    },
1252    Some(data) => {
1253      // If a buffer was passed, but isn't compressible, we use it to
1254      // construct a response body.
1255      Ok((HttpResponseWriter::Closed, data.to_vec().into()))
1256    }
1257    None if compressing => {
1258      // Create a one way pipe that implements tokio's async io traits. To do
1259      // this we create a [tokio::io::DuplexStream], but then throw away one
1260      // of the directions to create a one way pipe.
1261      let (a, b) = tokio::io::duplex(64 * 1024);
1262      let (reader, _) = tokio::io::split(a);
1263      let (_, writer) = tokio::io::split(b);
1264      let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
1265        Encoding::Brotli => {
1266          Box::pin(BrotliEncoder::with_quality(writer, Level::Fastest))
1267        }
1268        Encoding::Gzip => Box::pin(GzipEncoder::with_quality(
1269          writer,
1270          Level::Precise(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1271        )),
1272        _ => unreachable!(), // forbidden by accepts_compression
1273      };
1274      let (stream, shutdown_handle) =
1275        ExternallyAbortableReaderStream::new(reader);
1276      Ok((
1277        HttpResponseWriter::Body {
1278          writer,
1279          shutdown_handle,
1280        },
1281        Body::wrap_stream(stream),
1282      ))
1283    }
1284    None => {
1285      let (body_tx, body_rx) = Body::channel();
1286      Ok((
1287        HttpResponseWriter::BodyUncompressed(body_tx.into()),
1288        body_rx,
1289      ))
1290    }
1291  }
1292}
1293
1294// If user provided a ETag header for uncompressed data, we need to
1295// ensure it is a Weak Etag header ("W/").
1296fn weaken_etag(hmap: &mut hyper_v014::HeaderMap) {
1297  if let Some(etag) = hmap.get_mut(hyper_v014::header::ETAG) {
1298    if !etag.as_bytes().starts_with(b"W/") {
1299      let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
1300      v.extend(b"W/");
1301      v.extend(etag.as_bytes());
1302      *etag = v.try_into().unwrap();
1303    }
1304  }
1305}
1306
1307// Set Vary: Accept-Encoding header for direct body response.
1308// Note: we set the header irrespective of whether or not we compress the data
1309// to make sure cache services do not serve uncompressed data to clients that
1310// support compression.
1311fn ensure_vary_accept_encoding(hmap: &mut hyper_v014::HeaderMap) {
1312  if let Some(v) = hmap.get_mut(hyper_v014::header::VARY) {
1313    if let Ok(s) = v.to_str() {
1314      if !s.to_lowercase().contains("accept-encoding") {
1315        *v = format!("Accept-Encoding, {s}").try_into().unwrap()
1316      }
1317      return;
1318    }
1319  }
1320  hmap.insert(
1321    hyper_v014::header::VARY,
1322    HeaderValue::from_static("Accept-Encoding"),
1323  );
1324}
1325
1326fn should_compress(headers: &hyper_v014::HeaderMap) -> bool {
1327  // skip compression if the cache-control header value is set to "no-transform" or not utf8
1328  fn cache_control_no_transform(
1329    headers: &hyper_v014::HeaderMap,
1330  ) -> Option<bool> {
1331    let v = headers.get(hyper_v014::header::CACHE_CONTROL)?;
1332    let s = match std::str::from_utf8(v.as_bytes()) {
1333      Ok(s) => s,
1334      Err(_) => return Some(true),
1335    };
1336    let c = CacheControl::from_value(s)?;
1337    Some(c.no_transform)
1338  }
1339  // we skip compression if the `content-range` header value is set, as it
1340  // indicates the contents of the body were negotiated based directly
1341  // with the user code and we can't compress the response
1342  let content_range = headers.contains_key(hyper_v014::header::CONTENT_RANGE);
1343  // assume body is already compressed if Content-Encoding header present, thus avoid recompressing
1344  let is_precompressed =
1345    headers.contains_key(hyper_v014::header::CONTENT_ENCODING);
1346
1347  !content_range
1348    && !is_precompressed
1349    && !cache_control_no_transform(headers).unwrap_or_default()
1350    && headers
1351      .get(hyper_v014::header::CONTENT_TYPE)
1352      .map(compressible::is_content_compressible)
1353      .unwrap_or_default()
1354}
1355
1356#[op2(async)]
1357async fn op_http_write_resource(
1358  state: Rc<RefCell<OpState>>,
1359  #[smi] rid: ResourceId,
1360  #[smi] stream: ResourceId,
1361) -> Result<(), HttpError> {
1362  let http_stream = state
1363    .borrow()
1364    .resource_table
1365    .get::<HttpStreamWriteResource>(rid)?;
1366  let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
1367  let resource = state.borrow().resource_table.get_any(stream)?;
1368  loop {
1369    match *wr {
1370      HttpResponseWriter::Headers(_) => {
1371        return Err(HttpError::NoResponseHeaders)
1372      }
1373      HttpResponseWriter::Closed => {
1374        return Err(HttpError::ResponseAlreadyCompleted)
1375      }
1376      _ => {}
1377    };
1378
1379    let view = resource.clone().read(64 * 1024).await?; // 64KB
1380    if view.is_empty() {
1381      break;
1382    }
1383
1384    match &mut *wr {
1385      HttpResponseWriter::Body { writer, .. } => {
1386        let mut result = writer.write_all(&view).await;
1387        if result.is_ok() {
1388          result = writer.flush().await;
1389        }
1390        if let Err(err) = result {
1391          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1392          // Don't return "broken pipe", that's an implementation detail.
1393          // Pull up the failure associated with the transport connection instead.
1394          http_stream.conn.closed().await?;
1395          // If there was no connection error, drop body_tx.
1396          *wr = HttpResponseWriter::Closed;
1397        }
1398      }
1399      HttpResponseWriter::BodyUncompressed(body) => {
1400        let bytes = view.to_vec().into();
1401        if let Err(err) = body.sender().send_data(bytes).await {
1402          assert!(err.is_closed());
1403          // Pull up the failure associated with the transport connection instead.
1404          http_stream.conn.closed().await?;
1405          // If there was no connection error, drop body_tx.
1406          *wr = HttpResponseWriter::Closed;
1407        }
1408      }
1409      _ => unreachable!(),
1410    };
1411  }
1412  Ok(())
1413}
1414
1415#[op2(async)]
1416async fn op_http_write(
1417  state: Rc<RefCell<OpState>>,
1418  #[smi] rid: ResourceId,
1419  #[buffer] buf: JsBuffer,
1420) -> Result<(), HttpError> {
1421  let stream = state
1422    .borrow()
1423    .resource_table
1424    .get::<HttpStreamWriteResource>(rid)?;
1425  let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1426
1427  if let Some(otel) = stream.otel_info.as_ref() {
1428    let mut maybe_otel_info = otel.borrow_mut();
1429    if let Some(otel_info) = maybe_otel_info.as_mut() {
1430      if let Some(response_size) = otel_info.response_size.as_mut() {
1431        *response_size += buf.len() as u64;
1432      }
1433    }
1434  }
1435
1436  match &mut *wr {
1437    HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
1438    HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
1439    HttpResponseWriter::Body { writer, .. } => {
1440      let mut result = writer.write_all(&buf).await;
1441      if result.is_ok() {
1442        result = writer.flush().await;
1443      }
1444      match result {
1445        Ok(_) => Ok(()),
1446        Err(err) => {
1447          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1448          // Don't return "broken pipe", that's an implementation detail.
1449          // Pull up the failure associated with the transport connection instead.
1450          stream.conn.closed().await?;
1451          // If there was no connection error, drop body_tx.
1452          *wr = HttpResponseWriter::Closed;
1453          Err(HttpError::ResponseAlreadyCompleted)
1454        }
1455      }
1456    }
1457    HttpResponseWriter::BodyUncompressed(body) => {
1458      let bytes = Bytes::from(buf.to_vec());
1459      match body.sender().send_data(bytes).await {
1460        Ok(_) => Ok(()),
1461        Err(err) => {
1462          assert!(err.is_closed());
1463          // Pull up the failure associated with the transport connection instead.
1464          stream.conn.closed().await?;
1465          // If there was no connection error, drop body_tx.
1466          *wr = HttpResponseWriter::Closed;
1467          Err(HttpError::ResponseAlreadyCompleted)
1468        }
1469      }
1470    }
1471  }
1472}
1473
1474/// Gracefully closes the write half of the HTTP stream. Note that this does not
1475/// remove the HTTP stream resource from the resource table; it still has to be
1476/// closed with `Deno.core.close()`.
1477#[op2(async)]
1478async fn op_http_shutdown(
1479  state: Rc<RefCell<OpState>>,
1480  #[smi] rid: ResourceId,
1481) -> Result<(), HttpError> {
1482  let stream = state
1483    .borrow()
1484    .resource_table
1485    .get::<HttpStreamWriteResource>(rid)?;
1486  let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1487  let wr = take(&mut *wr);
1488  match wr {
1489    HttpResponseWriter::Body {
1490      mut writer,
1491      shutdown_handle,
1492    } => {
1493      shutdown_handle.shutdown();
1494      match writer.shutdown().await {
1495        Ok(_) => {}
1496        Err(err) => {
1497          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1498          // Don't return "broken pipe", that's an implementation detail.
1499          // Pull up the failure associated with the transport connection instead.
1500          stream.conn.closed().await?;
1501        }
1502      }
1503    }
1504    HttpResponseWriter::BodyUncompressed(body) => {
1505      body.shutdown();
1506    }
1507    _ => {}
1508  };
1509  Ok(())
1510}
1511
1512#[op2]
1513#[string]
1514fn op_http_websocket_accept_header(#[string] key: String) -> String {
1515  let digest = ring::digest::digest(
1516    &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
1517    format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
1518  );
1519  BASE64_STANDARD.encode(digest)
1520}
1521
1522#[op2(async)]
1523#[smi]
1524async fn op_http_upgrade_websocket(
1525  state: Rc<RefCell<OpState>>,
1526  #[smi] rid: ResourceId,
1527) -> Result<ResourceId, HttpError> {
1528  let stream = state
1529    .borrow_mut()
1530    .resource_table
1531    .get::<HttpStreamReadResource>(rid)?;
1532  let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
1533
1534  let request = match &mut *rd {
1535    HttpRequestReader::Headers(request) => request,
1536    _ => {
1537      return Err(HttpError::UpgradeBodyUsed)
1538        .inspect_err(|e| handle_error_otel(&stream.otel_info, e))
1539    }
1540  };
1541
1542  let (transport, bytes) = extract_network_stream(
1543    hyper_v014::upgrade::on(request)
1544      .await
1545      .map_err(|err| HttpError::HyperV014(Arc::new(err)))
1546      .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?,
1547  );
1548  Ok(ws_create_server_stream(
1549    &mut state.borrow_mut(),
1550    transport,
1551    bytes,
1552  ))
1553}
1554
1555// Needed so hyper can use non Send futures
1556#[derive(Clone)]
1557pub struct LocalExecutor;
1558
1559impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
1560where
1561  Fut: Future + 'static,
1562  Fut::Output: 'static,
1563{
1564  fn execute(&self, fut: Fut) {
1565    deno_core::unsync::spawn(fut);
1566  }
1567}
1568
1569impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
1570where
1571  Fut: Future + 'static,
1572  Fut::Output: 'static,
1573{
1574  fn execute(&self, fut: Fut) {
1575    deno_core::unsync::spawn(fut);
1576  }
1577}
1578
1579/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
1580fn filter_enotconn(
1581  result: Result<(), hyper_v014::Error>,
1582) -> Result<(), hyper_v014::Error> {
1583  if result
1584    .as_ref()
1585    .err()
1586    .and_then(|err| err.source())
1587    .and_then(|err| err.downcast_ref::<io::Error>())
1588    .filter(|err| err.kind() == io::ErrorKind::NotConnected)
1589    .is_some()
1590  {
1591    Ok(())
1592  } else {
1593    result
1594  }
1595}
1596
1597/// Create a future that is forever pending.
1598fn never() -> Pending<Never> {
1599  pending()
1600}
1601
1602trait CanDowncastUpgrade: Sized {
1603  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1604    self,
1605  ) -> Result<(T, Bytes), Self>;
1606}
1607
1608impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
1609  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1610    self,
1611  ) -> Result<(T, Bytes), Self> {
1612    let hyper::upgrade::Parts { io, read_buf, .. } =
1613      self.downcast::<TokioIo<T>>()?;
1614    Ok((io.into_inner(), read_buf))
1615  }
1616}
1617
1618impl CanDowncastUpgrade for hyper_v014::upgrade::Upgraded {
1619  fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1620    self,
1621  ) -> Result<(T, Bytes), Self> {
1622    let hyper_v014::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
1623    Ok((io, read_buf))
1624  }
1625}
1626
1627fn maybe_extract_network_stream<
1628  T: Into<NetworkStream> + AsyncRead + AsyncWrite + Unpin + 'static,
1629  U: CanDowncastUpgrade,
1630>(
1631  upgraded: U,
1632) -> Result<(NetworkStream, Bytes), U> {
1633  let upgraded = match upgraded.downcast::<T>() {
1634    Ok((stream, bytes)) => return Ok((stream.into(), bytes)),
1635    Err(x) => x,
1636  };
1637
1638  match upgraded.downcast::<NetworkBufferedStream<T>>() {
1639    Ok((stream, upgraded_bytes)) => {
1640      // Both the upgrade and the stream might have unread bytes
1641      let (io, stream_bytes) = stream.into_inner();
1642      let bytes = match (stream_bytes.is_empty(), upgraded_bytes.is_empty()) {
1643        (false, false) => Bytes::default(),
1644        (true, false) => upgraded_bytes,
1645        (false, true) => stream_bytes,
1646        (true, true) => {
1647          // The upgraded bytes come first as they have already been read
1648          let mut v = upgraded_bytes.to_vec();
1649          v.append(&mut stream_bytes.to_vec());
1650          Bytes::from(v)
1651        }
1652      };
1653      Ok((io.into(), bytes))
1654    }
1655    Err(x) => Err(x),
1656  }
1657}
1658
1659fn extract_network_stream<U: CanDowncastUpgrade>(
1660  upgraded: U,
1661) -> (NetworkStream, Bytes) {
1662  let upgraded =
1663    match maybe_extract_network_stream::<tokio::net::TcpStream, _>(upgraded) {
1664      Ok(res) => return res,
1665      Err(x) => x,
1666    };
1667  let upgraded =
1668    match maybe_extract_network_stream::<deno_net::ops_tls::TlsStream, _>(
1669      upgraded,
1670    ) {
1671      Ok(res) => return res,
1672      Err(x) => x,
1673    };
1674  #[cfg(unix)]
1675  let upgraded =
1676    match maybe_extract_network_stream::<tokio::net::UnixStream, _>(upgraded) {
1677      Ok(res) => return res,
1678      Err(x) => x,
1679    };
1680  let upgraded =
1681    match maybe_extract_network_stream::<NetworkStream, _>(upgraded) {
1682      Ok(res) => return res,
1683      Err(x) => x,
1684    };
1685
1686  // TODO(mmastrac): HTTP/2 websockets may yield an un-downgradable type
1687  drop(upgraded);
1688  unreachable!("unexpected stream type");
1689}