1use std::borrow::Cow;
4use std::cell::RefCell;
5use std::cmp::min;
6use std::error::Error;
7use std::future::Future;
8use std::future::Pending;
9use std::future::pending;
10use std::io;
11use std::io::Write;
12use std::mem::replace;
13use std::mem::take;
14use std::net::SocketAddr;
15use std::pin::Pin;
16use std::pin::pin;
17use std::rc::Rc;
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::task::Context;
22use std::task::Poll;
23use std::task::ready;
24
25use async_compression::Level;
26use async_compression::tokio::write::BrotliEncoder;
27use async_compression::tokio::write::GzipEncoder;
28use base64::Engine;
29use base64::prelude::BASE64_STANDARD;
30use cache_control::CacheControl;
31use deno_core::AsyncRefCell;
32use deno_core::AsyncResult;
33use deno_core::BufView;
34use deno_core::CancelFuture;
35use deno_core::CancelHandle;
36use deno_core::CancelTryFuture;
37use deno_core::JsBuffer;
38use deno_core::OpState;
39use deno_core::RcRef;
40use deno_core::Resource;
41use deno_core::ResourceId;
42use deno_core::StringOrBuffer;
43use deno_core::convert::ByteString;
44use deno_core::futures::FutureExt;
45use deno_core::futures::StreamExt;
46use deno_core::futures::TryFutureExt;
47use deno_core::futures::channel::mpsc;
48use deno_core::futures::channel::oneshot;
49use deno_core::futures::future::Either;
50use deno_core::futures::future::RemoteHandle;
51use deno_core::futures::future::Shared;
52use deno_core::futures::future::select;
53use deno_core::futures::never::Never;
54use deno_core::futures::stream::Peekable;
55use deno_core::op2;
56use deno_core::unsync::spawn;
57use deno_error::JsErrorBox;
58use deno_net::raw::NetworkStream;
59use deno_telemetry::Histogram;
60use deno_telemetry::MeterProvider;
61use deno_telemetry::OTEL_GLOBALS;
62use deno_telemetry::UpDownCounter;
63use deno_websocket::ws_create_server_stream;
64use flate2::Compression;
65use flate2::write::GzEncoder;
66use hyper::server::conn::http1;
67use hyper::server::conn::http2;
68use hyper_util::rt::TokioIo;
69use hyper_v014::Body;
70use hyper_v014::HeaderMap;
71use hyper_v014::Request;
72use hyper_v014::Response;
73use hyper_v014::body::Bytes;
74use hyper_v014::body::HttpBody;
75use hyper_v014::body::SizeHint;
76use hyper_v014::header::HeaderName;
77use hyper_v014::header::HeaderValue;
78use hyper_v014::server::conn::Http;
79use hyper_v014::service::Service;
80use once_cell::sync::OnceCell;
81use tokio::io::AsyncRead;
82use tokio::io::AsyncWrite;
83use tokio::io::AsyncWriteExt;
84use tokio::net::TcpStream;
85use tokio::sync::Notify;
86
87use crate::network_buffered_stream::NetworkBufferedStream;
88use crate::reader_stream::ExternallyAbortableReaderStream;
89use crate::reader_stream::ShutdownHandle;
90
91pub mod compressible;
92mod fly_accept_encoding;
93mod http_next;
94mod network_buffered_stream;
95mod reader_stream;
96mod request_body;
97mod request_properties;
98mod response_body;
99mod service;
100
101use fly_accept_encoding::Encoding;
102pub use http_next::HttpNextError;
103pub use request_properties::DefaultHttpPropertyExtractor;
104pub use request_properties::HttpConnectionProperties;
105pub use request_properties::HttpListenProperties;
106pub use request_properties::HttpPropertyExtractor;
107pub use request_properties::HttpRequestProperties;
108pub use service::UpgradeUnavailableError;
109
110struct OtelCollectors {
111 duration: Histogram<f64>,
112 active_requests: UpDownCounter<i64>,
113 request_size: Histogram<u64>,
114 response_size: Histogram<u64>,
115}
116
117static OTEL_COLLECTORS: OnceCell<OtelCollectors> = OnceCell::new();
118
119#[derive(Debug, Default, Clone, Copy)]
120pub struct Options {
121 pub http2_builder_hook:
128 Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
129 pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
136
137 pub no_legacy_abort: bool,
139}
140
141#[cfg(not(feature = "default_property_extractor"))]
142deno_core::extension!(
143 deno_http,
144 deps = [deno_web, deno_net, deno_fetch, deno_websocket],
145 parameters = [ HTTP: HttpPropertyExtractor ],
146 ops = [
147 op_http_accept,
148 op_http_headers,
149 op_http_serve_address_override,
150 op_http_shutdown,
151 op_http_upgrade_websocket,
152 op_http_websocket_accept_header,
153 op_http_write_headers,
154 op_http_write_resource,
155 op_http_write,
156 http_next::op_http_close_after_finish,
157 http_next::op_http_get_request_header,
158 http_next::op_http_get_request_headers,
159 http_next::op_http_request_on_cancel,
160 http_next::op_http_get_request_method_and_url<HTTP>,
161 http_next::op_http_get_request_cancelled,
162 http_next::op_http_read_request_body,
163 http_next::op_http_serve_on<HTTP>,
164 http_next::op_http_serve<HTTP>,
165 http_next::op_http_set_promise_complete,
166 http_next::op_http_set_response_body_bytes,
167 http_next::op_http_set_response_body_resource,
168 http_next::op_http_set_response_body_text,
169 http_next::op_http_set_response_header,
170 http_next::op_http_set_response_headers,
171 http_next::op_http_set_response_trailers,
172 http_next::op_http_upgrade_websocket_next,
173 http_next::op_http_upgrade_raw,
174 http_next::op_raw_write_vectored,
175 http_next::op_can_write_vectored,
176 http_next::op_http_try_wait,
177 http_next::op_http_wait,
178 http_next::op_http_close,
179 http_next::op_http_cancel,
180 http_next::op_http_metric_handle_otel_error,
181 ],
182 esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
183 options = {
184 options: Options,
185 },
186 state = |state, options| {
187 state.put::<Options>(options.options);
188 }
189);
190
191#[cfg(feature = "default_property_extractor")]
192deno_core::extension!(
193 deno_http,
194 deps = [deno_web, deno_net, deno_fetch, deno_websocket],
195 ops = [
196 op_http_accept,
197 op_http_headers,
198 op_http_serve_address_override,
199 op_http_shutdown,
200 op_http_upgrade_websocket,
201 op_http_websocket_accept_header,
202 op_http_write_headers,
203 op_http_write_resource,
204 op_http_write,
205 op_http_notify_serving,
206 http_next::op_http_close_after_finish,
207 http_next::op_http_get_request_header,
208 http_next::op_http_get_request_headers,
209 http_next::op_http_request_on_cancel,
210 http_next::op_http_get_request_method_and_url<DefaultHttpPropertyExtractor>,
211 http_next::op_http_get_request_cancelled,
212 http_next::op_http_read_request_body,
213 http_next::op_http_serve_on<DefaultHttpPropertyExtractor>,
214 http_next::op_http_serve<DefaultHttpPropertyExtractor>,
215 http_next::op_http_set_promise_complete,
216 http_next::op_http_set_response_body_bytes,
217 http_next::op_http_set_response_body_resource,
218 http_next::op_http_set_response_body_text,
219 http_next::op_http_set_response_header,
220 http_next::op_http_set_response_headers,
221 http_next::op_http_set_response_trailers,
222 http_next::op_http_upgrade_websocket_next,
223 http_next::op_http_upgrade_raw,
224 http_next::op_raw_write_vectored,
225 http_next::op_can_write_vectored,
226 http_next::op_http_try_wait,
227 http_next::op_http_wait,
228 http_next::op_http_close,
229 http_next::op_http_cancel,
230 http_next::op_http_metric_handle_otel_error,
231 ],
232 esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
233 options = {
234 options: Options,
235 },
236 state = |state, options| {
237 state.put::<Options>(options.options);
238 }
239);
240
241#[derive(Debug, thiserror::Error, deno_error::JsError)]
242pub enum HttpError {
243 #[class(inherit)]
244 #[error(transparent)]
245 Resource(#[from] deno_core::error::ResourceError),
246 #[class(inherit)]
247 #[error(transparent)]
248 Canceled(#[from] deno_core::Canceled),
249 #[class("Http")]
250 #[error("{0}")]
251 HyperV014(#[source] Arc<hyper_v014::Error>),
252 #[class(generic)]
253 #[error("{0}")]
254 InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
255 #[class(generic)]
256 #[error("{0}")]
257 InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
258 #[class(generic)]
259 #[error("{0}")]
260 Http(#[from] hyper_v014::http::Error),
261 #[class("Http")]
262 #[error("response headers already sent")]
263 ResponseHeadersAlreadySent,
264 #[class("Http")]
265 #[error("connection closed while sending response")]
266 ConnectionClosedWhileSendingResponse,
267 #[class("Http")]
268 #[error("already in use")]
269 AlreadyInUse,
270 #[class(inherit)]
271 #[error("{0}")]
272 Io(#[from] std::io::Error),
273 #[class("Http")]
274 #[error("no response headers")]
275 NoResponseHeaders,
276 #[class("Http")]
277 #[error("response already completed")]
278 ResponseAlreadyCompleted,
279 #[class("Http")]
280 #[error("cannot upgrade because request body was used")]
281 UpgradeBodyUsed,
282 #[class("Http")]
283 #[error(transparent)]
284 Other(#[from] JsErrorBox),
285}
286
287pub enum HttpSocketAddr {
288 IpSocket(std::net::SocketAddr),
289 #[cfg(unix)]
290 UnixSocket(tokio::net::unix::SocketAddr),
291}
292
293impl From<std::net::SocketAddr> for HttpSocketAddr {
294 fn from(addr: std::net::SocketAddr) -> Self {
295 Self::IpSocket(addr)
296 }
297}
298
299#[cfg(unix)]
300impl From<tokio::net::unix::SocketAddr> for HttpSocketAddr {
301 fn from(addr: tokio::net::unix::SocketAddr) -> Self {
302 Self::UnixSocket(addr)
303 }
304}
305
306struct OtelInfo {
307 attributes: OtelInfoAttributes,
308 duration: Option<std::time::Instant>,
309 request_size: Option<u64>,
310 response_size: Option<u64>,
311}
312
313struct OtelInfoAttributes {
314 http_request_method: Cow<'static, str>,
315 network_protocol_version: &'static str,
316 url_scheme: Cow<'static, str>,
317 server_address: Option<String>,
318 server_port: Option<i64>,
319 error_type: Option<&'static str>,
320 http_response_status_code: Option<i64>,
321}
322
323impl OtelInfoAttributes {
324 fn method(method: &http::method::Method) -> Cow<'static, str> {
325 use http::method::Method;
326
327 match *method {
328 Method::GET => Cow::Borrowed("GET"),
329 Method::POST => Cow::Borrowed("POST"),
330 Method::PUT => Cow::Borrowed("PUT"),
331 Method::DELETE => Cow::Borrowed("DELETE"),
332 Method::HEAD => Cow::Borrowed("HEAD"),
333 Method::OPTIONS => Cow::Borrowed("OPTIONS"),
334 Method::CONNECT => Cow::Borrowed("CONNECT"),
335 Method::PATCH => Cow::Borrowed("PATCH"),
336 Method::TRACE => Cow::Borrowed("TRACE"),
337 _ => Cow::Owned(method.to_string()),
338 }
339 }
340
341 fn method_v02(method: &http_v02::method::Method) -> Cow<'static, str> {
342 use http_v02::method::Method;
343
344 match *method {
345 Method::GET => Cow::Borrowed("GET"),
346 Method::POST => Cow::Borrowed("POST"),
347 Method::PUT => Cow::Borrowed("PUT"),
348 Method::DELETE => Cow::Borrowed("DELETE"),
349 Method::HEAD => Cow::Borrowed("HEAD"),
350 Method::OPTIONS => Cow::Borrowed("OPTIONS"),
351 Method::CONNECT => Cow::Borrowed("CONNECT"),
352 Method::PATCH => Cow::Borrowed("PATCH"),
353 Method::TRACE => Cow::Borrowed("TRACE"),
354 _ => Cow::Owned(method.to_string()),
355 }
356 }
357
358 fn version(version: http::Version) -> &'static str {
359 use http::Version;
360
361 match version {
362 Version::HTTP_09 => "0.9",
363 Version::HTTP_10 => "1.0",
364 Version::HTTP_11 => "1.1",
365 Version::HTTP_2 => "2",
366 Version::HTTP_3 => "3",
367 _ => unreachable!(),
368 }
369 }
370
371 fn version_v02(version: http_v02::Version) -> &'static str {
372 use http_v02::Version;
373
374 match version {
375 Version::HTTP_09 => "0.9",
376 Version::HTTP_10 => "1.0",
377 Version::HTTP_11 => "1.1",
378 Version::HTTP_2 => "2",
379 Version::HTTP_3 => "3",
380 _ => unreachable!(),
381 }
382 }
383
384 fn for_counter(&self) -> Vec<deno_telemetry::KeyValue> {
385 let mut attributes = vec![
386 deno_telemetry::KeyValue::new(
387 "http.request.method",
388 self.http_request_method.clone(),
389 ),
390 deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
391 ];
392
393 if let Some(address) = self.server_address.clone() {
394 attributes.push(deno_telemetry::KeyValue::new("server.address", address));
395 }
396 if let Some(port) = self.server_port {
397 attributes.push(deno_telemetry::KeyValue::new("server.port", port));
398 }
399
400 attributes
401 }
402
403 fn for_histogram(&self) -> Vec<deno_telemetry::KeyValue> {
404 let mut histogram_attributes = vec![
405 deno_telemetry::KeyValue::new(
406 "http.request.method",
407 self.http_request_method.clone(),
408 ),
409 deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
410 deno_telemetry::KeyValue::new(
411 "network.protocol.version",
412 self.network_protocol_version,
413 ),
414 ];
415
416 if let Some(address) = self.server_address.clone() {
417 histogram_attributes
418 .push(deno_telemetry::KeyValue::new("server.address", address));
419 }
420 if let Some(port) = self.server_port {
421 histogram_attributes
422 .push(deno_telemetry::KeyValue::new("server.port", port));
423 }
424 if let Some(status_code) = self.http_response_status_code {
425 histogram_attributes.push(deno_telemetry::KeyValue::new(
426 "http.response.status_code",
427 status_code,
428 ));
429 }
430
431 if let Some(error) = self.error_type {
432 histogram_attributes
433 .push(deno_telemetry::KeyValue::new("error.type", error));
434 }
435
436 histogram_attributes
437 }
438}
439
440impl OtelInfo {
441 fn new(
442 otel: &deno_telemetry::OtelGlobals,
443 instant: std::time::Instant,
444 request_size: u64,
445 attributes: OtelInfoAttributes,
446 ) -> Self {
447 let collectors = OTEL_COLLECTORS.get_or_init(|| {
448 let meter = otel
449 .meter_provider
450 .meter_with_scope(otel.builtin_instrumentation_scope.clone());
451
452 let duration = meter
453 .f64_histogram("http.server.request.duration")
454 .with_unit("s")
455 .with_description("Duration of HTTP server requests.")
456 .with_boundaries(vec![
457 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0,
458 7.5, 10.0,
459 ])
460 .build();
461
462 let active_requests = meter
463 .i64_up_down_counter("http.server.active_requests")
464 .with_unit("{request}")
465 .with_description("Number of active HTTP server requests.")
466 .build();
467
468 let request_size = meter
469 .u64_histogram("http.server.request.body.size")
470 .with_unit("By")
471 .with_description("Size of HTTP server request bodies.")
472 .with_boundaries(vec![
473 0.0,
474 100.0,
475 1000.0,
476 10000.0,
477 100000.0,
478 1000000.0,
479 10000000.0,
480 100000000.0,
481 1000000000.0,
482 ])
483 .build();
484
485 let response_size = meter
486 .u64_histogram("http.server.response.body.size")
487 .with_unit("By")
488 .with_description("Size of HTTP server response bodies.")
489 .with_boundaries(vec![
490 0.0,
491 100.0,
492 1000.0,
493 10000.0,
494 100000.0,
495 1000000.0,
496 10000000.0,
497 100000000.0,
498 1000000000.0,
499 ])
500 .build();
501
502 OtelCollectors {
503 duration,
504 active_requests,
505 request_size,
506 response_size,
507 }
508 });
509
510 collectors.active_requests.add(1, &attributes.for_counter());
511
512 Self {
513 attributes,
514 duration: Some(instant),
515 request_size: Some(request_size),
516 response_size: Some(0),
517 }
518 }
519
520 fn handle_duration_and_request_size(&mut self) {
521 let collectors = OTEL_COLLECTORS.get().unwrap();
522 let attributes = self.attributes.for_histogram();
523
524 if let Some(duration) = self.duration.take() {
525 let duration = duration.elapsed();
526 collectors
527 .duration
528 .record(duration.as_secs_f64(), &attributes);
529 }
530
531 if let Some(request_size) = self.request_size.take() {
532 let collectors = OTEL_COLLECTORS.get().unwrap();
533 collectors.request_size.record(request_size, &attributes);
534 }
535 }
536}
537
538impl Drop for OtelInfo {
539 fn drop(&mut self) {
540 let collectors = OTEL_COLLECTORS.get().unwrap();
541
542 self.handle_duration_and_request_size();
543
544 collectors
545 .active_requests
546 .add(-1, &self.attributes.for_counter());
547
548 if let Some(response_size) = self.response_size {
549 collectors
550 .response_size
551 .record(response_size, &self.attributes.for_histogram());
552 }
553 }
554}
555
556fn handle_error_otel(
557 otel: &Option<Rc<RefCell<Option<OtelInfo>>>>,
558 error: &HttpError,
559) {
560 if let Some(otel) = otel.as_ref() {
561 let mut maybe_otel_info = otel.borrow_mut();
562 if let Some(otel_info) = maybe_otel_info.as_mut() {
563 otel_info.attributes.error_type = Some(match error {
564 HttpError::Resource(_) => "resource",
565 HttpError::Canceled(_) => "canceled",
566 HttpError::HyperV014(_) => "hyper",
567 HttpError::InvalidHeaderName(_) => "invalid header name",
568 HttpError::InvalidHeaderValue(_) => "invalid header value",
569 HttpError::Http(_) => "http",
570 HttpError::ResponseHeadersAlreadySent => {
571 "response headers already sent"
572 }
573 HttpError::ConnectionClosedWhileSendingResponse => {
574 "connection closed while sending response"
575 }
576 HttpError::AlreadyInUse => "already in use",
577 HttpError::Io(_) => "io",
578 HttpError::NoResponseHeaders => "no response headers",
579 HttpError::ResponseAlreadyCompleted => "response already completed",
580 HttpError::UpgradeBodyUsed => "upgrade body used",
581 HttpError::Other(_) => "unknown",
582 });
583 }
584 }
585}
586
587struct HttpConnResource {
588 addr: HttpSocketAddr,
589 scheme: &'static str,
590 acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
591 closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper_v014::Error>>>>,
592 cancel_handle: Rc<CancelHandle>, }
594
595impl HttpConnResource {
596 fn new<S>(io: S, scheme: &'static str, addr: HttpSocketAddr) -> Self
597 where
598 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
599 {
600 let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
601 let service = HttpService::new(acceptors_rx);
602
603 let conn_fut = Http::new()
604 .with_executor(LocalExecutor)
605 .serve_connection(io, service)
606 .with_upgrades();
607
608 let cancel_handle = CancelHandle::new_rc();
612 let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
613
614 let task_fut = async move {
616 let conn_fut = pin!(conn_fut);
617 let shutdown_fut = pin!(shutdown_fut);
618 let result = match select(conn_fut, shutdown_fut).await {
619 Either::Left((result, _)) => result,
620 Either::Right((_, mut conn_fut)) => {
621 conn_fut.as_mut().graceful_shutdown();
622 conn_fut.await
623 }
624 };
625 filter_enotconn(result).map_err(Arc::from)
626 };
627 let (task_fut, closed_fut) = task_fut.remote_handle();
628 let closed_fut = closed_fut.shared();
629 spawn(task_fut);
630
631 Self {
632 addr,
633 scheme,
634 acceptors_tx,
635 closed_fut,
636 cancel_handle,
637 }
638 }
639
640 async fn accept(
642 self: &Rc<Self>,
643 ) -> Result<
644 Option<(
645 HttpStreamReadResource,
646 HttpStreamWriteResource,
647 String,
648 String,
649 )>,
650 HttpError,
651 > {
652 let fut = async {
653 let (request_tx, request_rx) = oneshot::channel();
654 let (response_tx, response_rx) = oneshot::channel();
655
656 let otel_instant = OTEL_GLOBALS
657 .get()
658 .filter(|o| o.has_metrics())
659 .map(|_| std::time::Instant::now());
660
661 let acceptor = HttpAcceptor::new(request_tx, response_rx);
662 self.acceptors_tx.unbounded_send(acceptor).ok()?;
663
664 let request = request_rx.await.ok()?;
665 let accept_encoding = {
666 let encodings =
667 fly_accept_encoding::encodings_iter_http_02(request.headers())
668 .filter(|r| {
669 matches!(r, Ok((Some(Encoding::Brotli | Encoding::Gzip), _)))
670 });
671
672 fly_accept_encoding::preferred(encodings)
673 .ok()
674 .flatten()
675 .unwrap_or(Encoding::Identity)
676 };
677
678 let otel_info =
679 OTEL_GLOBALS.get().filter(|o| o.has_metrics()).map(|otel| {
680 let size_hint = request.size_hint();
681 Rc::new(RefCell::new(Some(OtelInfo::new(
682 otel,
683 otel_instant.unwrap(),
684 size_hint.upper().unwrap_or(size_hint.lower()),
685 OtelInfoAttributes {
686 http_request_method: OtelInfoAttributes::method_v02(
687 request.method(),
688 ),
689 url_scheme: Cow::Borrowed(self.scheme),
690 network_protocol_version: OtelInfoAttributes::version_v02(
691 request.version(),
692 ),
693 server_address: request.uri().host().map(|host| host.to_string()),
694 server_port: request.uri().port_u16().map(|port| port as i64),
695 error_type: Default::default(),
696 http_response_status_code: Default::default(),
697 },
698 ))))
699 });
700
701 let method = request.method().to_string();
702 let url = req_url(&request, self.scheme, &self.addr);
703 let read_stream =
704 HttpStreamReadResource::new(self, request, otel_info.clone());
705 let write_stream = HttpStreamWriteResource::new(
706 self,
707 response_tx,
708 accept_encoding,
709 otel_info,
710 );
711 Some((read_stream, write_stream, method, url))
712 };
713
714 async {
715 match fut.await {
716 Some(stream) => Ok(Some(stream)),
717 None => self.closed().map_ok(|_| None).await,
719 }
720 }
721 .try_or_cancel(&self.cancel_handle)
722 .await
723 }
724
725 async fn closed(&self) -> Result<(), HttpError> {
727 self.closed_fut.clone().map_err(HttpError::HyperV014).await
728 }
729}
730
731impl Resource for HttpConnResource {
732 fn name(&self) -> Cow<'_, str> {
733 "httpConn".into()
734 }
735
736 fn close(self: Rc<Self>) {
737 self.cancel_handle.cancel();
738 }
739}
740
741pub fn http_create_conn_resource<S, A>(
743 state: &mut OpState,
744 io: S,
745 addr: A,
746 scheme: &'static str,
747) -> ResourceId
748where
749 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
750 A: Into<HttpSocketAddr>,
751{
752 let conn = HttpConnResource::new(io, scheme, addr.into());
753 state.resource_table.add(conn)
754}
755
756struct HttpService {
759 acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
760}
761
762impl HttpService {
763 fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
764 let acceptors_rx = acceptors_rx.peekable();
765 Self { acceptors_rx }
766 }
767}
768
769impl Service<Request<Body>> for HttpService {
770 type Response = Response<Body>;
771 type Error = oneshot::Canceled;
772 type Future = oneshot::Receiver<Response<Body>>;
773
774 fn poll_ready(
775 &mut self,
776 cx: &mut Context<'_>,
777 ) -> Poll<Result<(), Self::Error>> {
778 let acceptors_rx = Pin::new(&mut self.acceptors_rx);
779 let result = ready!(acceptors_rx.poll_peek(cx))
780 .map(|_| ())
781 .ok_or(oneshot::Canceled);
782 Poll::Ready(result)
783 }
784
785 fn call(&mut self, request: Request<Body>) -> Self::Future {
786 let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
787 acceptor.call(request)
788 }
789}
790
791struct HttpAcceptor {
795 request_tx: oneshot::Sender<Request<Body>>,
796 response_rx: oneshot::Receiver<Response<Body>>,
797}
798
799impl HttpAcceptor {
800 fn new(
801 request_tx: oneshot::Sender<Request<Body>>,
802 response_rx: oneshot::Receiver<Response<Body>>,
803 ) -> Self {
804 Self {
805 request_tx,
806 response_rx,
807 }
808 }
809
810 fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
811 let Self {
812 request_tx,
813 response_rx,
814 } = self;
815 request_tx
816 .send(request)
817 .map(|_| response_rx)
818 .unwrap_or_else(|_| oneshot::channel().1) }
820}
821
822pub struct HttpStreamReadResource {
823 _conn: Rc<HttpConnResource>,
824 pub rd: AsyncRefCell<HttpRequestReader>,
825 cancel_handle: CancelHandle,
826 size: SizeHint,
827 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
828}
829
830pub struct HttpStreamWriteResource {
831 conn: Rc<HttpConnResource>,
832 wr: AsyncRefCell<HttpResponseWriter>,
833 accept_encoding: Encoding,
834 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
835}
836
837impl HttpStreamReadResource {
838 fn new(
839 conn: &Rc<HttpConnResource>,
840 request: Request<Body>,
841 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
842 ) -> Self {
843 let size = request.body().size_hint();
844 Self {
845 _conn: conn.clone(),
846 rd: HttpRequestReader::Headers(request).into(),
847 size,
848 cancel_handle: CancelHandle::new(),
849 otel_info,
850 }
851 }
852}
853
854impl Resource for HttpStreamReadResource {
855 fn name(&self) -> Cow<'_, str> {
856 "httpReadStream".into()
857 }
858
859 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
860 Box::pin(async move {
861 let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
862
863 let body = loop {
864 match &mut *rd {
865 HttpRequestReader::Headers(_) => {}
866 HttpRequestReader::Body(_, body) => break body,
867 HttpRequestReader::Closed => return Ok(BufView::empty()),
868 }
869 match take(&mut *rd) {
870 HttpRequestReader::Headers(request) => {
871 let (parts, body) = request.into_parts();
872 *rd = HttpRequestReader::Body(parts.headers, body.peekable());
873 }
874 _ => unreachable!(),
875 };
876 };
877
878 let fut = async {
879 let mut body = Pin::new(body);
880 loop {
881 match body.as_mut().peek_mut().await {
882 Some(Ok(chunk)) if !chunk.is_empty() => {
883 let len = min(limit, chunk.len());
884 let buf = chunk.split_to(len);
885 let view = BufView::from(buf);
886 break Ok(view);
887 }
888 Some(_) => match body.as_mut().next().await.unwrap() {
895 Ok(chunk) => assert!(chunk.is_empty()),
896 Err(err) => {
897 break Err(JsErrorBox::from_err(HttpError::HyperV014(
898 Arc::new(err),
899 )));
900 }
901 },
902 None => break Ok(BufView::empty()),
903 }
904 }
905 };
906
907 let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
908 fut.try_or_cancel(cancel_handle).await
909 })
910 }
911
912 fn close(self: Rc<Self>) {
913 self.cancel_handle.cancel();
914 }
915
916 fn size_hint(&self) -> (u64, Option<u64>) {
917 (self.size.lower(), self.size.upper())
918 }
919}
920
921impl HttpStreamWriteResource {
922 fn new(
923 conn: &Rc<HttpConnResource>,
924 response_tx: oneshot::Sender<Response<Body>>,
925 accept_encoding: Encoding,
926 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
927 ) -> Self {
928 Self {
929 conn: conn.clone(),
930 wr: HttpResponseWriter::Headers(response_tx).into(),
931 accept_encoding,
932 otel_info,
933 }
934 }
935}
936
937impl Resource for HttpStreamWriteResource {
938 fn name(&self) -> Cow<'_, str> {
939 "httpWriteStream".into()
940 }
941}
942
943#[derive(Default)]
945pub enum HttpRequestReader {
946 Headers(Request<Body>),
947 Body(HeaderMap<HeaderValue>, Peekable<Body>),
948 #[default]
949 Closed,
950}
951
952#[derive(Default)]
954enum HttpResponseWriter {
955 Headers(oneshot::Sender<Response<Body>>),
956 Body {
957 writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
958 shutdown_handle: ShutdownHandle,
959 },
960 BodyUncompressed(BodyUncompressedSender),
961 #[default]
962 Closed,
963}
964
965struct BodyUncompressedSender(Option<hyper_v014::body::Sender>);
966
967impl BodyUncompressedSender {
968 fn sender(&mut self) -> &mut hyper_v014::body::Sender {
969 self.0.as_mut().unwrap()
972 }
973
974 fn shutdown(mut self) {
975 self.0.take();
978 }
979}
980
981impl From<hyper_v014::body::Sender> for BodyUncompressedSender {
982 fn from(sender: hyper_v014::body::Sender) -> Self {
983 BodyUncompressedSender(Some(sender))
984 }
985}
986
987impl Drop for BodyUncompressedSender {
988 fn drop(&mut self) {
989 if let Some(sender) = self.0.take() {
990 sender.abort();
991 }
992 }
993}
994
995#[derive(deno_core::ToV8)]
997struct NextRequestResponse(
998 ResourceId,
1000 ResourceId,
1002 String,
1006 String,
1008);
1009
1010#[op2]
1011async fn op_http_accept(
1012 state: Rc<RefCell<OpState>>,
1013 #[smi] rid: ResourceId,
1014) -> Result<Option<NextRequestResponse>, HttpError> {
1015 let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
1016
1017 match conn.accept().await {
1018 Ok(Some((read_stream, write_stream, method, url))) => {
1019 let read_stream_rid = state
1020 .borrow_mut()
1021 .resource_table
1022 .add_rc(Rc::new(read_stream));
1023 let write_stream_rid = state
1024 .borrow_mut()
1025 .resource_table
1026 .add_rc(Rc::new(write_stream));
1027 let r =
1028 NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
1029 Ok(Some(r))
1030 }
1031 Ok(None) => Ok(None),
1032 Err(err) => Err(err),
1033 }
1034}
1035
1036fn req_url(
1037 req: &hyper_v014::Request<hyper_v014::Body>,
1038 scheme: &'static str,
1039 addr: &HttpSocketAddr,
1040) -> String {
1041 let host: Cow<'_, str> = match addr {
1042 HttpSocketAddr::IpSocket(addr) => {
1043 if let Some(auth) = req.uri().authority() {
1044 match addr.port() {
1045 443 if scheme == "https" => Cow::Borrowed(auth.host()),
1046 80 if scheme == "http" => Cow::Borrowed(auth.host()),
1047 _ => Cow::Borrowed(auth.as_str()), }
1049 } else if let Some(host) = req.uri().host() {
1050 Cow::Borrowed(host)
1051 } else if let Some(host) = req.headers().get("HOST") {
1052 match host.to_str() {
1053 Ok(host) => Cow::Borrowed(host),
1054 Err(_) => Cow::Owned(
1055 host
1056 .as_bytes()
1057 .iter()
1058 .cloned()
1059 .map(char::from)
1060 .collect::<String>(),
1061 ),
1062 }
1063 } else {
1064 Cow::Owned(addr.to_string())
1065 }
1066 }
1067 #[cfg(unix)]
1071 HttpSocketAddr::UnixSocket(addr) => Cow::Owned(
1072 percent_encoding::percent_encode(
1073 addr
1074 .as_pathname()
1075 .and_then(|x| x.to_str())
1076 .unwrap_or_default()
1077 .as_bytes(),
1078 percent_encoding::NON_ALPHANUMERIC,
1079 )
1080 .to_string(),
1081 ),
1082 };
1083 let path = req
1084 .uri()
1085 .path_and_query()
1086 .map(|p| p.as_str())
1087 .unwrap_or("/");
1088 [scheme, "://", &host, path].concat()
1089}
1090
1091fn req_headers(
1092 header_map: &HeaderMap<HeaderValue>,
1093) -> Vec<(ByteString, ByteString)> {
1094 let cookie_sep = "; ".as_bytes();
1099 let mut cookies = vec![];
1100
1101 let mut headers = Vec::with_capacity(header_map.len());
1102 for (name, value) in header_map.iter() {
1103 if name == hyper_v014::header::COOKIE {
1104 cookies.push(value.as_bytes());
1105 } else {
1106 let name: &[u8] = name.as_ref();
1107 let value = value.as_bytes();
1108 headers.push((name.into(), value.into()));
1109 }
1110 }
1111
1112 if !cookies.is_empty() {
1113 headers.push(("cookie".into(), cookies.join(cookie_sep).into()));
1114 }
1115
1116 headers
1117}
1118
1119#[op2]
1120async fn op_http_write_headers(
1121 state: Rc<RefCell<OpState>>,
1122 #[smi] rid: u32,
1123 #[smi] status: u16,
1124 #[scoped] headers: Vec<(ByteString, ByteString)>,
1125 #[serde] data: Option<StringOrBuffer>,
1126) -> Result<(), HttpError> {
1127 let stream = state
1128 .borrow_mut()
1129 .resource_table
1130 .get::<HttpStreamWriteResource>(rid)?;
1131
1132 let encoding = stream.accept_encoding;
1134
1135 let mut builder = Response::builder();
1136 let hmap = unsafe { builder.headers_mut().unwrap_unchecked() };
1138
1139 hmap.reserve(headers.len() + 2);
1141 for (k, v) in headers.into_iter() {
1142 let v: Vec<u8> = v.into();
1143 hmap.append(
1144 HeaderName::try_from(k.as_slice())?,
1145 HeaderValue::try_from(v)?,
1146 );
1147 }
1148 ensure_vary_accept_encoding(hmap);
1149
1150 let accepts_compression =
1151 matches!(encoding, Encoding::Brotli | Encoding::Gzip);
1152 let compressing = accepts_compression
1153 && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
1154 && should_compress(hmap);
1155
1156 if compressing {
1157 weaken_etag(hmap);
1158 hmap.remove(hyper_v014::header::CONTENT_LENGTH);
1160 hmap.insert(
1162 hyper_v014::header::CONTENT_ENCODING,
1163 HeaderValue::from_static(match encoding {
1164 Encoding::Brotli => "br",
1165 Encoding::Gzip => "gzip",
1166 _ => unreachable!(), }),
1168 );
1169 }
1170
1171 let (new_wr, body) = http_response(data, compressing, encoding)?;
1172 let body = builder.status(status).body(body)?;
1173
1174 if let Some(otel) = stream.otel_info.as_ref() {
1175 let mut otel = otel.borrow_mut();
1176 if let Some(otel_info) = otel.as_mut() {
1177 otel_info.attributes.http_response_status_code = Some(status as _);
1178 otel_info.handle_duration_and_request_size();
1179 }
1180 }
1181
1182 let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1183 let response_tx = match replace(&mut *old_wr, new_wr) {
1184 HttpResponseWriter::Headers(response_tx) => response_tx,
1185 _ => return Err(HttpError::ResponseHeadersAlreadySent),
1186 };
1187
1188 match response_tx.send(body) {
1189 Ok(_) => Ok(()),
1190 Err(_) => {
1191 stream.conn.closed().await?;
1192 Err(HttpError::ConnectionClosedWhileSendingResponse)
1193 }
1194 }
1195}
1196
1197#[op2]
1198fn op_http_headers(
1199 state: &mut OpState,
1200 #[smi] rid: u32,
1201) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
1202 let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
1203 let rd = RcRef::map(&stream, |r| &r.rd)
1204 .try_borrow()
1205 .ok_or(HttpError::AlreadyInUse)
1206 .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?;
1207 match &*rd {
1208 HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
1209 HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
1210 _ => unreachable!(),
1211 }
1212}
1213
1214fn http_response(
1215 data: Option<StringOrBuffer>,
1216 compressing: bool,
1217 encoding: Encoding,
1218) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
1219 const GZIP_DEFAULT_COMPRESSION_LEVEL: u8 = 1;
1223
1224 match data {
1225 Some(data) if compressing => match encoding {
1226 Encoding::Brotli => {
1227 let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
1233 writer.write_all(&data)?;
1234 Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
1235 }
1236 Encoding::Gzip => {
1237 let mut writer = GzEncoder::new(
1238 Vec::new(),
1239 Compression::new(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1240 );
1241 writer.write_all(&data)?;
1242 Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
1243 }
1244 _ => unreachable!(), },
1246 Some(data) => {
1247 Ok((HttpResponseWriter::Closed, data.to_vec().into()))
1250 }
1251 None if compressing => {
1252 let (a, b) = tokio::io::duplex(64 * 1024);
1256 let (reader, _) = tokio::io::split(a);
1257 let (_, writer) = tokio::io::split(b);
1258 let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
1259 Encoding::Brotli => {
1260 Box::pin(BrotliEncoder::with_quality(writer, Level::Fastest))
1261 }
1262 Encoding::Gzip => Box::pin(GzipEncoder::with_quality(
1263 writer,
1264 Level::Precise(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1265 )),
1266 _ => unreachable!(), };
1268 let (stream, shutdown_handle) =
1269 ExternallyAbortableReaderStream::new(reader);
1270 Ok((
1271 HttpResponseWriter::Body {
1272 writer,
1273 shutdown_handle,
1274 },
1275 Body::wrap_stream(stream),
1276 ))
1277 }
1278 None => {
1279 let (body_tx, body_rx) = Body::channel();
1280 Ok((
1281 HttpResponseWriter::BodyUncompressed(body_tx.into()),
1282 body_rx,
1283 ))
1284 }
1285 }
1286}
1287
1288fn weaken_etag(hmap: &mut hyper_v014::HeaderMap) {
1291 if let Some(etag) = hmap.get_mut(hyper_v014::header::ETAG)
1292 && !etag.as_bytes().starts_with(b"W/")
1293 {
1294 let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
1295 v.extend(b"W/");
1296 v.extend(etag.as_bytes());
1297 *etag = v.try_into().unwrap();
1298 }
1299}
1300
1301fn ensure_vary_accept_encoding(hmap: &mut hyper_v014::HeaderMap) {
1306 if let Some(v) = hmap.get_mut(hyper_v014::header::VARY)
1307 && let Ok(s) = v.to_str()
1308 {
1309 if !s.to_lowercase().contains("accept-encoding") {
1310 *v = format!("Accept-Encoding, {s}").try_into().unwrap()
1311 }
1312 return;
1313 }
1314 hmap.insert(
1315 hyper_v014::header::VARY,
1316 HeaderValue::from_static("Accept-Encoding"),
1317 );
1318}
1319
1320fn should_compress(headers: &hyper_v014::HeaderMap) -> bool {
1321 fn cache_control_no_transform(
1323 headers: &hyper_v014::HeaderMap,
1324 ) -> Option<bool> {
1325 let v = headers.get(hyper_v014::header::CACHE_CONTROL)?;
1326 let s = match std::str::from_utf8(v.as_bytes()) {
1327 Ok(s) => s,
1328 Err(_) => return Some(true),
1329 };
1330 let c = CacheControl::from_value(s)?;
1331 Some(c.no_transform)
1332 }
1333 let content_range = headers.contains_key(hyper_v014::header::CONTENT_RANGE);
1337 let is_precompressed =
1339 headers.contains_key(hyper_v014::header::CONTENT_ENCODING);
1340
1341 !content_range
1342 && !is_precompressed
1343 && !cache_control_no_transform(headers).unwrap_or_default()
1344 && headers
1345 .get(hyper_v014::header::CONTENT_TYPE)
1346 .map(compressible::is_content_compressible)
1347 .unwrap_or_default()
1348}
1349
1350#[op2]
1351async fn op_http_write_resource(
1352 state: Rc<RefCell<OpState>>,
1353 #[smi] rid: ResourceId,
1354 #[smi] stream: ResourceId,
1355) -> Result<(), HttpError> {
1356 let http_stream = state
1357 .borrow()
1358 .resource_table
1359 .get::<HttpStreamWriteResource>(rid)?;
1360 let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
1361 let resource = state.borrow().resource_table.get_any(stream)?;
1362 loop {
1363 match *wr {
1364 HttpResponseWriter::Headers(_) => {
1365 return Err(HttpError::NoResponseHeaders);
1366 }
1367 HttpResponseWriter::Closed => {
1368 return Err(HttpError::ResponseAlreadyCompleted);
1369 }
1370 _ => {}
1371 };
1372
1373 let view = resource.clone().read(64 * 1024).await?; if view.is_empty() {
1375 break;
1376 }
1377
1378 match &mut *wr {
1379 HttpResponseWriter::Body { writer, .. } => {
1380 let mut result = writer.write_all(&view).await;
1381 if result.is_ok() {
1382 result = writer.flush().await;
1383 }
1384 if let Err(err) = result {
1385 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1386 http_stream.conn.closed().await?;
1389 *wr = HttpResponseWriter::Closed;
1391 }
1392 }
1393 HttpResponseWriter::BodyUncompressed(body) => {
1394 let bytes = view.to_vec().into();
1395 if let Err(err) = body.sender().send_data(bytes).await {
1396 assert!(err.is_closed());
1397 http_stream.conn.closed().await?;
1399 *wr = HttpResponseWriter::Closed;
1401 }
1402 }
1403 _ => unreachable!(),
1404 };
1405 }
1406 Ok(())
1407}
1408
1409#[op2]
1410async fn op_http_write(
1411 state: Rc<RefCell<OpState>>,
1412 #[smi] rid: ResourceId,
1413 #[buffer] buf: JsBuffer,
1414) -> Result<(), HttpError> {
1415 let stream = state
1416 .borrow()
1417 .resource_table
1418 .get::<HttpStreamWriteResource>(rid)?;
1419 let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1420
1421 if let Some(otel) = stream.otel_info.as_ref() {
1422 let mut maybe_otel_info = otel.borrow_mut();
1423 if let Some(otel_info) = maybe_otel_info.as_mut()
1424 && let Some(response_size) = otel_info.response_size.as_mut()
1425 {
1426 *response_size += buf.len() as u64;
1427 }
1428 }
1429
1430 match &mut *wr {
1431 HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
1432 HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
1433 HttpResponseWriter::Body { writer, .. } => {
1434 let mut result = writer.write_all(&buf).await;
1435 if result.is_ok() {
1436 result = writer.flush().await;
1437 }
1438 match result {
1439 Ok(_) => Ok(()),
1440 Err(err) => {
1441 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1442 stream.conn.closed().await?;
1445 *wr = HttpResponseWriter::Closed;
1447 Err(HttpError::ResponseAlreadyCompleted)
1448 }
1449 }
1450 }
1451 HttpResponseWriter::BodyUncompressed(body) => {
1452 let bytes = Bytes::from(buf.to_vec());
1453 match body.sender().send_data(bytes).await {
1454 Ok(_) => Ok(()),
1455 Err(err) => {
1456 assert!(err.is_closed());
1457 stream.conn.closed().await?;
1459 *wr = HttpResponseWriter::Closed;
1461 Err(HttpError::ResponseAlreadyCompleted)
1462 }
1463 }
1464 }
1465 }
1466}
1467
1468#[op2]
1472async fn op_http_shutdown(
1473 state: Rc<RefCell<OpState>>,
1474 #[smi] rid: ResourceId,
1475) -> Result<(), HttpError> {
1476 let stream = state
1477 .borrow()
1478 .resource_table
1479 .get::<HttpStreamWriteResource>(rid)?;
1480 let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1481 let wr = take(&mut *wr);
1482 match wr {
1483 HttpResponseWriter::Body {
1484 mut writer,
1485 shutdown_handle,
1486 } => {
1487 shutdown_handle.shutdown();
1488 match writer.shutdown().await {
1489 Ok(_) => {}
1490 Err(err) => {
1491 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1492 stream.conn.closed().await?;
1495 }
1496 }
1497 }
1498 HttpResponseWriter::BodyUncompressed(body) => {
1499 body.shutdown();
1500 }
1501 _ => {}
1502 };
1503 Ok(())
1504}
1505
1506#[op2]
1507#[string]
1508fn op_http_websocket_accept_header(#[string] key: String) -> String {
1509 let digest = aws_lc_rs::digest::digest(
1510 &aws_lc_rs::digest::SHA1_FOR_LEGACY_USE_ONLY,
1511 format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
1512 );
1513 BASE64_STANDARD.encode(digest)
1514}
1515
1516#[op2]
1517#[smi]
1518async fn op_http_upgrade_websocket(
1519 state: Rc<RefCell<OpState>>,
1520 #[smi] rid: ResourceId,
1521) -> Result<ResourceId, HttpError> {
1522 let stream = state
1523 .borrow_mut()
1524 .resource_table
1525 .get::<HttpStreamReadResource>(rid)?;
1526 let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
1527
1528 let request = match &mut *rd {
1529 HttpRequestReader::Headers(request) => request,
1530 _ => {
1531 return Err(HttpError::UpgradeBodyUsed)
1532 .inspect_err(|e| handle_error_otel(&stream.otel_info, e));
1533 }
1534 };
1535
1536 let (transport, bytes) = extract_network_stream(
1537 hyper_v014::upgrade::on(request)
1538 .await
1539 .map_err(|err| HttpError::HyperV014(Arc::new(err)))
1540 .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?,
1541 );
1542 Ok(ws_create_server_stream(
1543 &mut state.borrow_mut(),
1544 transport,
1545 bytes,
1546 ))
1547}
1548
1549#[derive(Clone)]
1551pub struct LocalExecutor;
1552
1553impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
1554where
1555 Fut: Future + 'static,
1556 Fut::Output: 'static,
1557{
1558 fn execute(&self, fut: Fut) {
1559 deno_core::unsync::spawn(fut);
1560 }
1561}
1562
1563impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
1564where
1565 Fut: Future + 'static,
1566 Fut::Output: 'static,
1567{
1568 fn execute(&self, fut: Fut) {
1569 deno_core::unsync::spawn(fut);
1570 }
1571}
1572
1573fn filter_enotconn(
1575 result: Result<(), hyper_v014::Error>,
1576) -> Result<(), hyper_v014::Error> {
1577 if result
1578 .as_ref()
1579 .err()
1580 .and_then(|err| err.source())
1581 .and_then(|err| err.downcast_ref::<io::Error>())
1582 .filter(|err| err.kind() == io::ErrorKind::NotConnected)
1583 .is_some()
1584 {
1585 Ok(())
1586 } else {
1587 result
1588 }
1589}
1590
1591fn never() -> Pending<Never> {
1593 pending()
1594}
1595
1596trait CanDowncastUpgrade: Sized {
1597 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1598 self,
1599 ) -> Result<(T, Bytes), Self>;
1600}
1601
1602impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
1603 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1604 self,
1605 ) -> Result<(T, Bytes), Self> {
1606 let hyper::upgrade::Parts { io, read_buf, .. } =
1607 self.downcast::<TokioIo<T>>()?;
1608 Ok((io.into_inner(), read_buf))
1609 }
1610}
1611
1612impl CanDowncastUpgrade for hyper_v014::upgrade::Upgraded {
1613 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1614 self,
1615 ) -> Result<(T, Bytes), Self> {
1616 let hyper_v014::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
1617 Ok((io, read_buf))
1618 }
1619}
1620
1621fn maybe_extract_network_stream<
1622 T: Into<NetworkStream> + AsyncRead + AsyncWrite + Unpin + 'static,
1623 U: CanDowncastUpgrade,
1624>(
1625 upgraded: U,
1626) -> Result<(NetworkStream, Bytes), U> {
1627 let upgraded = match upgraded.downcast::<T>() {
1628 Ok((stream, bytes)) => return Ok((stream.into(), bytes)),
1629 Err(x) => x,
1630 };
1631
1632 match upgraded.downcast::<NetworkBufferedStream<T>>() {
1633 Ok((stream, upgraded_bytes)) => {
1634 let (io, stream_bytes) = stream.into_inner();
1636 let bytes = match (stream_bytes.is_empty(), upgraded_bytes.is_empty()) {
1637 (false, false) => Bytes::default(),
1638 (true, false) => upgraded_bytes,
1639 (false, true) => stream_bytes,
1640 (true, true) => {
1641 let mut v = upgraded_bytes.to_vec();
1643 v.append(&mut stream_bytes.to_vec());
1644 Bytes::from(v)
1645 }
1646 };
1647 Ok((io.into(), bytes))
1648 }
1649 Err(x) => Err(x),
1650 }
1651}
1652
1653fn extract_network_stream<U: CanDowncastUpgrade>(
1654 upgraded: U,
1655) -> (NetworkStream, Bytes) {
1656 let upgraded =
1657 match maybe_extract_network_stream::<tokio::net::TcpStream, _>(upgraded) {
1658 Ok(res) => return res,
1659 Err(x) => x,
1660 };
1661 let upgraded = match maybe_extract_network_stream::<
1662 deno_net::ops_tls::TlsStream<TcpStream>,
1663 _,
1664 >(upgraded)
1665 {
1666 Ok(res) => return res,
1667 Err(x) => x,
1668 };
1669 #[cfg(unix)]
1670 let upgraded =
1671 match maybe_extract_network_stream::<tokio::net::UnixStream, _>(upgraded) {
1672 Ok(res) => return res,
1673 Err(x) => x,
1674 };
1675 #[cfg(any(target_os = "android", target_os = "linux", target_os = "macos"))]
1676 let upgraded =
1677 match maybe_extract_network_stream::<tokio_vsock::VsockStream, _>(upgraded)
1678 {
1679 Ok(res) => return res,
1680 Err(x) => x,
1681 };
1682 let upgraded =
1683 match maybe_extract_network_stream::<deno_net::tunnel::TunnelStream, _>(
1684 upgraded,
1685 ) {
1686 Ok(res) => return res,
1687 Err(x) => x,
1688 };
1689 let upgraded =
1690 match maybe_extract_network_stream::<NetworkStream, _>(upgraded) {
1691 Ok(res) => return res,
1692 Err(x) => x,
1693 };
1694
1695 drop(upgraded);
1697 unreachable!("unexpected stream type");
1698}
1699
1700#[op2]
1701pub fn op_http_serve_address_override() -> (u8, String, u32, bool) {
1702 if let Ok(val) = std::env::var("DENO_SERVE_ADDRESS") {
1703 return parse_serve_address(&val);
1704 };
1705
1706 if deno_net::tunnel::get_tunnel().is_some() {
1707 return (4, String::new(), 0, true);
1708 }
1709
1710 (0, String::new(), 0, false)
1711}
1712
1713fn parse_serve_address(input: &str) -> (u8, String, u32, bool) {
1714 let (input, duplicate) = match input.strip_prefix("duplicate,") {
1715 Some(input) => (input, true),
1716 None => (input, false),
1717 };
1718 match input.split_once(':') {
1719 Some(("tcp", addr)) => {
1720 match addr.parse::<SocketAddr>() {
1722 Ok(addr) => {
1723 let hostname = match addr {
1724 SocketAddr::V4(v4) => v4.ip().to_string(),
1725 SocketAddr::V6(v6) => format!("[{}]", v6.ip()),
1726 };
1727 (1, hostname, addr.port() as u32, duplicate)
1728 }
1729 Err(_) => {
1730 log::error!("DENO_SERVE_ADDRESS: invalid TCP address: {}", addr);
1731 (0, String::new(), 0, false)
1732 }
1733 }
1734 }
1735 Some(("unix", addr)) => {
1736 if addr.is_empty() {
1738 log::error!("DENO_SERVE_ADDRESS: empty unix socket path");
1739 return (0, String::new(), 0, duplicate);
1740 }
1741 (2, addr.to_string(), 0, duplicate)
1742 }
1743 Some(("vsock", addr)) => {
1744 match addr.split_once(':') {
1746 Some((cid, port)) => {
1747 let cid = if cid == "-1" {
1748 "-1".to_string()
1749 } else {
1750 match cid.parse::<u32>() {
1751 Ok(cid) => cid.to_string(),
1752 Err(_) => {
1753 log::error!("DENO_SERVE_ADDRESS: invalid vsock CID: {}", cid);
1754 return (0, String::new(), 0, false);
1755 }
1756 }
1757 };
1758 let port = match port.parse::<u32>() {
1759 Ok(port) => port,
1760 Err(_) => {
1761 log::error!("DENO_SERVE_ADDRESS: invalid vsock port: {}", port);
1762 return (0, String::new(), 0, false);
1763 }
1764 };
1765 (3, cid, port, duplicate)
1766 }
1767 None => (0, String::new(), 0, false),
1768 }
1769 }
1770 Some(("tunnel", _)) => (4, String::new(), 0, duplicate),
1771 Some((_, _)) | None => {
1772 log::error!("DENO_SERVE_ADDRESS: invalid address format: {}", input);
1773 (0, String::new(), 0, false)
1774 }
1775 }
1776}
1777
1778pub static SERVE_NOTIFIER: Notify = Notify::const_new();
1779
1780#[op2(fast)]
1781fn op_http_notify_serving() {
1782 static ONCE: AtomicBool = AtomicBool::new(false);
1783
1784 if ONCE
1785 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
1786 .is_ok()
1787 {
1788 SERVE_NOTIFIER.notify_one();
1789 }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794 use super::*;
1795
1796 #[test]
1797 fn test_parse_serve_address() {
1798 assert_eq!(
1799 parse_serve_address("tcp:127.0.0.1:8080"),
1800 (1, "127.0.0.1".to_string(), 8080, false)
1801 );
1802 assert_eq!(
1803 parse_serve_address("tcp:[::1]:9000"),
1804 (1, "[::1]".to_string(), 9000, false)
1805 );
1806 assert_eq!(
1807 parse_serve_address("duplicate,tcp:[::1]:9000"),
1808 (1, "[::1]".to_string(), 9000, true)
1809 );
1810
1811 assert_eq!(
1812 parse_serve_address("unix:/var/run/socket.sock"),
1813 (2, "/var/run/socket.sock".to_string(), 0, false)
1814 );
1815 assert_eq!(
1816 parse_serve_address("duplicate,unix:/var/run/socket.sock"),
1817 (2, "/var/run/socket.sock".to_string(), 0, true)
1818 );
1819
1820 assert_eq!(
1821 parse_serve_address("vsock:1234:5678"),
1822 (3, "1234".to_string(), 5678, false)
1823 );
1824 assert_eq!(
1825 parse_serve_address("vsock:-1:5678"),
1826 (3, "-1".to_string(), 5678, false)
1827 );
1828 assert_eq!(
1829 parse_serve_address("duplicate,vsock:-1:5678"),
1830 (3, "-1".to_string(), 5678, true)
1831 );
1832
1833 assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0, false));
1834 assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0, false));
1835 assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0, false));
1836 assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0, false));
1837 assert_eq!(parse_serve_address("bar"), (0, String::new(), 0, false));
1838 }
1839}