1use std::borrow::Cow;
4use std::cell::RefCell;
5use std::cmp::min;
6use std::error::Error;
7use std::future::pending;
8use std::future::Future;
9use std::future::Pending;
10use std::io;
11use std::io::Write;
12use std::mem::replace;
13use std::mem::take;
14use std::pin::pin;
15use std::pin::Pin;
16use std::rc::Rc;
17use std::sync::Arc;
18use std::task::ready;
19use std::task::Context;
20use std::task::Poll;
21
22use async_compression::tokio::write::BrotliEncoder;
23use async_compression::tokio::write::GzipEncoder;
24use async_compression::Level;
25use base64::prelude::BASE64_STANDARD;
26use base64::Engine;
27use cache_control::CacheControl;
28use deno_core::futures::channel::mpsc;
29use deno_core::futures::channel::oneshot;
30use deno_core::futures::future::select;
31use deno_core::futures::future::Either;
32use deno_core::futures::future::RemoteHandle;
33use deno_core::futures::future::Shared;
34use deno_core::futures::never::Never;
35use deno_core::futures::stream::Peekable;
36use deno_core::futures::FutureExt;
37use deno_core::futures::StreamExt;
38use deno_core::futures::TryFutureExt;
39use deno_core::op2;
40use deno_core::unsync::spawn;
41use deno_core::AsyncRefCell;
42use deno_core::AsyncResult;
43use deno_core::BufView;
44use deno_core::ByteString;
45use deno_core::CancelFuture;
46use deno_core::CancelHandle;
47use deno_core::CancelTryFuture;
48use deno_core::JsBuffer;
49use deno_core::OpState;
50use deno_core::RcRef;
51use deno_core::Resource;
52use deno_core::ResourceId;
53use deno_core::StringOrBuffer;
54use deno_error::JsErrorBox;
55use deno_net::raw::NetworkStream;
56use deno_telemetry::Histogram;
57use deno_telemetry::MeterProvider;
58use deno_telemetry::UpDownCounter;
59use deno_telemetry::OTEL_GLOBALS;
60use deno_websocket::ws_create_server_stream;
61use flate2::write::GzEncoder;
62use flate2::Compression;
63use hyper::server::conn::http1;
64use hyper::server::conn::http2;
65use hyper_util::rt::TokioIo;
66use hyper_v014::body::Bytes;
67use hyper_v014::body::HttpBody;
68use hyper_v014::body::SizeHint;
69use hyper_v014::header::HeaderName;
70use hyper_v014::header::HeaderValue;
71use hyper_v014::server::conn::Http;
72use hyper_v014::service::Service;
73use hyper_v014::Body;
74use hyper_v014::HeaderMap;
75use hyper_v014::Request;
76use hyper_v014::Response;
77use once_cell::sync::OnceCell;
78use serde::Serialize;
79use tokio::io::AsyncRead;
80use tokio::io::AsyncWrite;
81use tokio::io::AsyncWriteExt;
82
83use crate::network_buffered_stream::NetworkBufferedStream;
84use crate::reader_stream::ExternallyAbortableReaderStream;
85use crate::reader_stream::ShutdownHandle;
86
87pub mod compressible;
88mod fly_accept_encoding;
89mod http_next;
90mod network_buffered_stream;
91mod reader_stream;
92mod request_body;
93mod request_properties;
94mod response_body;
95mod service;
96mod websocket_upgrade;
97
98use fly_accept_encoding::Encoding;
99pub use http_next::HttpNextError;
100pub use request_properties::DefaultHttpPropertyExtractor;
101pub use request_properties::HttpConnectionProperties;
102pub use request_properties::HttpListenProperties;
103pub use request_properties::HttpPropertyExtractor;
104pub use request_properties::HttpRequestProperties;
105pub use service::UpgradeUnavailableError;
106pub use websocket_upgrade::WebSocketUpgradeError;
107
108struct OtelCollectors {
109 duration: Histogram<f64>,
110 active_requests: UpDownCounter<i64>,
111 request_size: Histogram<u64>,
112 response_size: Histogram<u64>,
113}
114
115static OTEL_COLLECTORS: OnceCell<OtelCollectors> = OnceCell::new();
116
117#[derive(Debug, Default, Clone, Copy)]
118pub struct Options {
119 pub http2_builder_hook:
126 Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
127 pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
134
135 pub no_legacy_abort: bool,
137}
138
139#[cfg(not(feature = "default_property_extractor"))]
140deno_core::extension!(
141 deno_http,
142 deps = [deno_web, deno_net, deno_fetch, deno_websocket],
143 parameters = [ HTTP: HttpPropertyExtractor ],
144 ops = [
145 op_http_accept,
146 op_http_headers,
147 op_http_shutdown,
148 op_http_upgrade_websocket,
149 op_http_websocket_accept_header,
150 op_http_write_headers,
151 op_http_write_resource,
152 op_http_write,
153 http_next::op_http_close_after_finish,
154 http_next::op_http_get_request_header,
155 http_next::op_http_get_request_headers,
156 http_next::op_http_request_on_cancel,
157 http_next::op_http_get_request_method_and_url<HTTP>,
158 http_next::op_http_get_request_cancelled,
159 http_next::op_http_read_request_body,
160 http_next::op_http_serve_on<HTTP>,
161 http_next::op_http_serve<HTTP>,
162 http_next::op_http_set_promise_complete,
163 http_next::op_http_set_response_body_bytes,
164 http_next::op_http_set_response_body_resource,
165 http_next::op_http_set_response_body_text,
166 http_next::op_http_set_response_header,
167 http_next::op_http_set_response_headers,
168 http_next::op_http_set_response_trailers,
169 http_next::op_http_upgrade_websocket_next,
170 http_next::op_http_upgrade_raw,
171 http_next::op_raw_write_vectored,
172 http_next::op_can_write_vectored,
173 http_next::op_http_try_wait,
174 http_next::op_http_wait,
175 http_next::op_http_close,
176 http_next::op_http_cancel,
177 http_next::op_http_metric_handle_otel_error,
178 ],
179 esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
180 options = {
181 options: Options,
182 },
183 state = |state, options| {
184 state.put::<Options>(options.options);
185 }
186);
187
188#[cfg(feature = "default_property_extractor")]
189deno_core::extension!(
190 deno_http,
191 deps = [deno_web, deno_net, deno_fetch, deno_websocket],
192 ops = [
193 op_http_accept,
194 op_http_headers,
195 op_http_shutdown,
196 op_http_upgrade_websocket,
197 op_http_websocket_accept_header,
198 op_http_write_headers,
199 op_http_write_resource,
200 op_http_write,
201 http_next::op_http_close_after_finish,
202 http_next::op_http_get_request_header,
203 http_next::op_http_get_request_headers,
204 http_next::op_http_request_on_cancel,
205 http_next::op_http_get_request_method_and_url<DefaultHttpPropertyExtractor>,
206 http_next::op_http_get_request_cancelled,
207 http_next::op_http_read_request_body,
208 http_next::op_http_serve_on<DefaultHttpPropertyExtractor>,
209 http_next::op_http_serve<DefaultHttpPropertyExtractor>,
210 http_next::op_http_set_promise_complete,
211 http_next::op_http_set_response_body_bytes,
212 http_next::op_http_set_response_body_resource,
213 http_next::op_http_set_response_body_text,
214 http_next::op_http_set_response_header,
215 http_next::op_http_set_response_headers,
216 http_next::op_http_set_response_trailers,
217 http_next::op_http_upgrade_websocket_next,
218 http_next::op_http_upgrade_raw,
219 http_next::op_raw_write_vectored,
220 http_next::op_can_write_vectored,
221 http_next::op_http_try_wait,
222 http_next::op_http_wait,
223 http_next::op_http_close,
224 http_next::op_http_cancel,
225 http_next::op_http_metric_handle_otel_error,
226 ],
227 esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
228 options = {
229 options: Options,
230 },
231 state = |state, options| {
232 state.put::<Options>(options.options);
233 }
234);
235
236#[derive(Debug, thiserror::Error, deno_error::JsError)]
237pub enum HttpError {
238 #[class(inherit)]
239 #[error(transparent)]
240 Resource(#[from] deno_core::error::ResourceError),
241 #[class(inherit)]
242 #[error(transparent)]
243 Canceled(#[from] deno_core::Canceled),
244 #[class("Http")]
245 #[error("{0}")]
246 HyperV014(#[source] Arc<hyper_v014::Error>),
247 #[class(generic)]
248 #[error("{0}")]
249 InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
250 #[class(generic)]
251 #[error("{0}")]
252 InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
253 #[class(generic)]
254 #[error("{0}")]
255 Http(#[from] hyper_v014::http::Error),
256 #[class("Http")]
257 #[error("response headers already sent")]
258 ResponseHeadersAlreadySent,
259 #[class("Http")]
260 #[error("connection closed while sending response")]
261 ConnectionClosedWhileSendingResponse,
262 #[class("Http")]
263 #[error("already in use")]
264 AlreadyInUse,
265 #[class(inherit)]
266 #[error("{0}")]
267 Io(#[from] std::io::Error),
268 #[class("Http")]
269 #[error("no response headers")]
270 NoResponseHeaders,
271 #[class("Http")]
272 #[error("response already completed")]
273 ResponseAlreadyCompleted,
274 #[class("Http")]
275 #[error("cannot upgrade because request body was used")]
276 UpgradeBodyUsed,
277 #[class("Http")]
278 #[error(transparent)]
279 Other(#[from] JsErrorBox),
280}
281
282pub enum HttpSocketAddr {
283 IpSocket(std::net::SocketAddr),
284 #[cfg(unix)]
285 UnixSocket(tokio::net::unix::SocketAddr),
286}
287
288impl From<std::net::SocketAddr> for HttpSocketAddr {
289 fn from(addr: std::net::SocketAddr) -> Self {
290 Self::IpSocket(addr)
291 }
292}
293
294#[cfg(unix)]
295impl From<tokio::net::unix::SocketAddr> for HttpSocketAddr {
296 fn from(addr: tokio::net::unix::SocketAddr) -> Self {
297 Self::UnixSocket(addr)
298 }
299}
300
301struct OtelInfo {
302 attributes: OtelInfoAttributes,
303 duration: Option<std::time::Instant>,
304 request_size: Option<u64>,
305 response_size: Option<u64>,
306}
307
308struct OtelInfoAttributes {
309 http_request_method: Cow<'static, str>,
310 network_protocol_version: &'static str,
311 url_scheme: Cow<'static, str>,
312 server_address: Option<String>,
313 server_port: Option<i64>,
314 error_type: Option<&'static str>,
315 http_response_status_code: Option<i64>,
316}
317
318impl OtelInfoAttributes {
319 fn method(method: &http::method::Method) -> Cow<'static, str> {
320 use http::method::Method;
321
322 match *method {
323 Method::GET => Cow::Borrowed("GET"),
324 Method::POST => Cow::Borrowed("POST"),
325 Method::PUT => Cow::Borrowed("PUT"),
326 Method::DELETE => Cow::Borrowed("DELETE"),
327 Method::HEAD => Cow::Borrowed("HEAD"),
328 Method::OPTIONS => Cow::Borrowed("OPTIONS"),
329 Method::CONNECT => Cow::Borrowed("CONNECT"),
330 Method::PATCH => Cow::Borrowed("PATCH"),
331 Method::TRACE => Cow::Borrowed("TRACE"),
332 _ => Cow::Owned(method.to_string()),
333 }
334 }
335
336 fn method_v02(method: &http_v02::method::Method) -> Cow<'static, str> {
337 use http_v02::method::Method;
338
339 match *method {
340 Method::GET => Cow::Borrowed("GET"),
341 Method::POST => Cow::Borrowed("POST"),
342 Method::PUT => Cow::Borrowed("PUT"),
343 Method::DELETE => Cow::Borrowed("DELETE"),
344 Method::HEAD => Cow::Borrowed("HEAD"),
345 Method::OPTIONS => Cow::Borrowed("OPTIONS"),
346 Method::CONNECT => Cow::Borrowed("CONNECT"),
347 Method::PATCH => Cow::Borrowed("PATCH"),
348 Method::TRACE => Cow::Borrowed("TRACE"),
349 _ => Cow::Owned(method.to_string()),
350 }
351 }
352
353 fn version(version: http::Version) -> &'static str {
354 use http::Version;
355
356 match version {
357 Version::HTTP_09 => "0.9",
358 Version::HTTP_10 => "1.0",
359 Version::HTTP_11 => "1.1",
360 Version::HTTP_2 => "2",
361 Version::HTTP_3 => "3",
362 _ => unreachable!(),
363 }
364 }
365
366 fn version_v02(version: http_v02::Version) -> &'static str {
367 use http_v02::Version;
368
369 match version {
370 Version::HTTP_09 => "0.9",
371 Version::HTTP_10 => "1.0",
372 Version::HTTP_11 => "1.1",
373 Version::HTTP_2 => "2",
374 Version::HTTP_3 => "3",
375 _ => unreachable!(),
376 }
377 }
378
379 fn for_counter(&self) -> Vec<deno_telemetry::KeyValue> {
380 let mut attributes = vec![
381 deno_telemetry::KeyValue::new(
382 "http.request.method",
383 self.http_request_method.clone(),
384 ),
385 deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
386 ];
387
388 if let Some(address) = self.server_address.clone() {
389 attributes.push(deno_telemetry::KeyValue::new("server.address", address));
390 }
391 if let Some(port) = self.server_port {
392 attributes.push(deno_telemetry::KeyValue::new("server.port", port));
393 }
394
395 attributes
396 }
397
398 fn for_histogram(&self) -> Vec<deno_telemetry::KeyValue> {
399 let mut histogram_attributes = vec![
400 deno_telemetry::KeyValue::new(
401 "http.request.method",
402 self.http_request_method.clone(),
403 ),
404 deno_telemetry::KeyValue::new("url.scheme", self.url_scheme.clone()),
405 deno_telemetry::KeyValue::new(
406 "network.protocol.version",
407 self.network_protocol_version,
408 ),
409 ];
410
411 if let Some(address) = self.server_address.clone() {
412 histogram_attributes
413 .push(deno_telemetry::KeyValue::new("server.address", address));
414 }
415 if let Some(port) = self.server_port {
416 histogram_attributes
417 .push(deno_telemetry::KeyValue::new("server.port", port));
418 }
419 if let Some(status_code) = self.http_response_status_code {
420 histogram_attributes.push(deno_telemetry::KeyValue::new(
421 "http.response.status_code",
422 status_code,
423 ));
424 }
425
426 if let Some(error) = self.error_type {
427 histogram_attributes
428 .push(deno_telemetry::KeyValue::new("error.type", error));
429 }
430
431 histogram_attributes
432 }
433}
434
435impl OtelInfo {
436 fn new(
437 otel: &deno_telemetry::OtelGlobals,
438 instant: std::time::Instant,
439 request_size: u64,
440 attributes: OtelInfoAttributes,
441 ) -> Self {
442 let collectors = OTEL_COLLECTORS.get_or_init(|| {
443 let meter = otel
444 .meter_provider
445 .meter_with_scope(otel.builtin_instrumentation_scope.clone());
446
447 let duration = meter
448 .f64_histogram("http.server.request.duration")
449 .with_unit("s")
450 .with_description("Duration of HTTP server requests.")
451 .with_boundaries(vec![
452 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0,
453 7.5, 10.0,
454 ])
455 .build();
456
457 let active_requests = meter
458 .i64_up_down_counter("http.server.active_requests")
459 .with_unit("{request}")
460 .with_description("Number of active HTTP server requests.")
461 .build();
462
463 let request_size = meter
464 .u64_histogram("http.server.request.body.size")
465 .with_unit("By")
466 .with_description("Size of HTTP server request bodies.")
467 .with_boundaries(vec![
468 0.0,
469 100.0,
470 1000.0,
471 10000.0,
472 100000.0,
473 1000000.0,
474 10000000.0,
475 100000000.0,
476 1000000000.0,
477 ])
478 .build();
479
480 let response_size = meter
481 .u64_histogram("http.server.response.body.size")
482 .with_unit("By")
483 .with_description("Size of HTTP server response bodies.")
484 .with_boundaries(vec![
485 0.0,
486 100.0,
487 1000.0,
488 10000.0,
489 100000.0,
490 1000000.0,
491 10000000.0,
492 100000000.0,
493 1000000000.0,
494 ])
495 .build();
496
497 OtelCollectors {
498 duration,
499 active_requests,
500 request_size,
501 response_size,
502 }
503 });
504
505 collectors.active_requests.add(1, &attributes.for_counter());
506
507 Self {
508 attributes,
509 duration: Some(instant),
510 request_size: Some(request_size),
511 response_size: Some(0),
512 }
513 }
514
515 fn handle_duration_and_request_size(&mut self) {
516 let collectors = OTEL_COLLECTORS.get().unwrap();
517 let attributes = self.attributes.for_histogram();
518
519 if let Some(duration) = self.duration.take() {
520 let duration = duration.elapsed();
521 collectors
522 .duration
523 .record(duration.as_secs_f64(), &attributes);
524 }
525
526 if let Some(request_size) = self.request_size.take() {
527 let collectors = OTEL_COLLECTORS.get().unwrap();
528 collectors.request_size.record(request_size, &attributes);
529 }
530 }
531}
532
533impl Drop for OtelInfo {
534 fn drop(&mut self) {
535 let collectors = OTEL_COLLECTORS.get().unwrap();
536
537 self.handle_duration_and_request_size();
538
539 collectors
540 .active_requests
541 .add(-1, &self.attributes.for_counter());
542
543 if let Some(response_size) = self.response_size {
544 collectors
545 .response_size
546 .record(response_size, &self.attributes.for_histogram());
547 }
548 }
549}
550
551fn handle_error_otel(
552 otel: &Option<Rc<RefCell<Option<OtelInfo>>>>,
553 error: &HttpError,
554) {
555 if let Some(otel) = otel.as_ref() {
556 let mut maybe_otel_info = otel.borrow_mut();
557 if let Some(otel_info) = maybe_otel_info.as_mut() {
558 otel_info.attributes.error_type = Some(match error {
559 HttpError::Resource(_) => "resource",
560 HttpError::Canceled(_) => "canceled",
561 HttpError::HyperV014(_) => "hyper",
562 HttpError::InvalidHeaderName(_) => "invalid header name",
563 HttpError::InvalidHeaderValue(_) => "invalid header value",
564 HttpError::Http(_) => "http",
565 HttpError::ResponseHeadersAlreadySent => {
566 "response headers already sent"
567 }
568 HttpError::ConnectionClosedWhileSendingResponse => {
569 "connection closed while sending response"
570 }
571 HttpError::AlreadyInUse => "already in use",
572 HttpError::Io(_) => "io",
573 HttpError::NoResponseHeaders => "no response headers",
574 HttpError::ResponseAlreadyCompleted => "response already completed",
575 HttpError::UpgradeBodyUsed => "upgrade body used",
576 HttpError::Other(_) => "unknown",
577 });
578 }
579 }
580}
581
582struct HttpConnResource {
583 addr: HttpSocketAddr,
584 scheme: &'static str,
585 acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
586 closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper_v014::Error>>>>,
587 cancel_handle: Rc<CancelHandle>, }
589
590impl HttpConnResource {
591 fn new<S>(io: S, scheme: &'static str, addr: HttpSocketAddr) -> Self
592 where
593 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
594 {
595 let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>();
596 let service = HttpService::new(acceptors_rx);
597
598 let conn_fut = Http::new()
599 .with_executor(LocalExecutor)
600 .serve_connection(io, service)
601 .with_upgrades();
602
603 let cancel_handle = CancelHandle::new_rc();
607 let shutdown_fut = never().or_cancel(&cancel_handle).fuse();
608
609 let task_fut = async move {
611 let conn_fut = pin!(conn_fut);
612 let shutdown_fut = pin!(shutdown_fut);
613 let result = match select(conn_fut, shutdown_fut).await {
614 Either::Left((result, _)) => result,
615 Either::Right((_, mut conn_fut)) => {
616 conn_fut.as_mut().graceful_shutdown();
617 conn_fut.await
618 }
619 };
620 filter_enotconn(result).map_err(Arc::from)
621 };
622 let (task_fut, closed_fut) = task_fut.remote_handle();
623 let closed_fut = closed_fut.shared();
624 spawn(task_fut);
625
626 Self {
627 addr,
628 scheme,
629 acceptors_tx,
630 closed_fut,
631 cancel_handle,
632 }
633 }
634
635 async fn accept(
637 self: &Rc<Self>,
638 ) -> Result<
639 Option<(
640 HttpStreamReadResource,
641 HttpStreamWriteResource,
642 String,
643 String,
644 )>,
645 HttpError,
646 > {
647 let fut = async {
648 let (request_tx, request_rx) = oneshot::channel();
649 let (response_tx, response_rx) = oneshot::channel();
650
651 let otel_instant = OTEL_GLOBALS
652 .get()
653 .filter(|o| o.has_metrics())
654 .map(|_| std::time::Instant::now());
655
656 let acceptor = HttpAcceptor::new(request_tx, response_rx);
657 self.acceptors_tx.unbounded_send(acceptor).ok()?;
658
659 let request = request_rx.await.ok()?;
660 let accept_encoding = {
661 let encodings =
662 fly_accept_encoding::encodings_iter_http_02(request.headers())
663 .filter(|r| {
664 matches!(r, Ok((Some(Encoding::Brotli | Encoding::Gzip), _)))
665 });
666
667 fly_accept_encoding::preferred(encodings)
668 .ok()
669 .flatten()
670 .unwrap_or(Encoding::Identity)
671 };
672
673 let otel_info =
674 OTEL_GLOBALS.get().filter(|o| o.has_metrics()).map(|otel| {
675 let size_hint = request.size_hint();
676 Rc::new(RefCell::new(Some(OtelInfo::new(
677 otel,
678 otel_instant.unwrap(),
679 size_hint.upper().unwrap_or(size_hint.lower()),
680 OtelInfoAttributes {
681 http_request_method: OtelInfoAttributes::method_v02(
682 request.method(),
683 ),
684 url_scheme: Cow::Borrowed(self.scheme),
685 network_protocol_version: OtelInfoAttributes::version_v02(
686 request.version(),
687 ),
688 server_address: request.uri().host().map(|host| host.to_string()),
689 server_port: request.uri().port_u16().map(|port| port as i64),
690 error_type: Default::default(),
691 http_response_status_code: Default::default(),
692 },
693 ))))
694 });
695
696 let method = request.method().to_string();
697 let url = req_url(&request, self.scheme, &self.addr);
698 let read_stream =
699 HttpStreamReadResource::new(self, request, otel_info.clone());
700 let write_stream = HttpStreamWriteResource::new(
701 self,
702 response_tx,
703 accept_encoding,
704 otel_info,
705 );
706 Some((read_stream, write_stream, method, url))
707 };
708
709 async {
710 match fut.await {
711 Some(stream) => Ok(Some(stream)),
712 None => self.closed().map_ok(|_| None).await,
714 }
715 }
716 .try_or_cancel(&self.cancel_handle)
717 .await
718 }
719
720 async fn closed(&self) -> Result<(), HttpError> {
722 self.closed_fut.clone().map_err(HttpError::HyperV014).await
723 }
724}
725
726impl Resource for HttpConnResource {
727 fn name(&self) -> Cow<str> {
728 "httpConn".into()
729 }
730
731 fn close(self: Rc<Self>) {
732 self.cancel_handle.cancel();
733 }
734}
735
736pub fn http_create_conn_resource<S, A>(
738 state: &mut OpState,
739 io: S,
740 addr: A,
741 scheme: &'static str,
742) -> ResourceId
743where
744 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
745 A: Into<HttpSocketAddr>,
746{
747 let conn = HttpConnResource::new(io, scheme, addr.into());
748 state.resource_table.add(conn)
749}
750
751struct HttpService {
754 acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>,
755}
756
757impl HttpService {
758 fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self {
759 let acceptors_rx = acceptors_rx.peekable();
760 Self { acceptors_rx }
761 }
762}
763
764impl Service<Request<Body>> for HttpService {
765 type Response = Response<Body>;
766 type Error = oneshot::Canceled;
767 type Future = oneshot::Receiver<Response<Body>>;
768
769 fn poll_ready(
770 &mut self,
771 cx: &mut Context<'_>,
772 ) -> Poll<Result<(), Self::Error>> {
773 let acceptors_rx = Pin::new(&mut self.acceptors_rx);
774 let result = ready!(acceptors_rx.poll_peek(cx))
775 .map(|_| ())
776 .ok_or(oneshot::Canceled);
777 Poll::Ready(result)
778 }
779
780 fn call(&mut self, request: Request<Body>) -> Self::Future {
781 let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap();
782 acceptor.call(request)
783 }
784}
785
786struct HttpAcceptor {
790 request_tx: oneshot::Sender<Request<Body>>,
791 response_rx: oneshot::Receiver<Response<Body>>,
792}
793
794impl HttpAcceptor {
795 fn new(
796 request_tx: oneshot::Sender<Request<Body>>,
797 response_rx: oneshot::Receiver<Response<Body>>,
798 ) -> Self {
799 Self {
800 request_tx,
801 response_rx,
802 }
803 }
804
805 fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> {
806 let Self {
807 request_tx,
808 response_rx,
809 } = self;
810 request_tx
811 .send(request)
812 .map(|_| response_rx)
813 .unwrap_or_else(|_| oneshot::channel().1) }
815}
816
817pub struct HttpStreamReadResource {
818 _conn: Rc<HttpConnResource>,
819 pub rd: AsyncRefCell<HttpRequestReader>,
820 cancel_handle: CancelHandle,
821 size: SizeHint,
822 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
823}
824
825pub struct HttpStreamWriteResource {
826 conn: Rc<HttpConnResource>,
827 wr: AsyncRefCell<HttpResponseWriter>,
828 accept_encoding: Encoding,
829 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
830}
831
832impl HttpStreamReadResource {
833 fn new(
834 conn: &Rc<HttpConnResource>,
835 request: Request<Body>,
836 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
837 ) -> Self {
838 let size = request.body().size_hint();
839 Self {
840 _conn: conn.clone(),
841 rd: HttpRequestReader::Headers(request).into(),
842 size,
843 cancel_handle: CancelHandle::new(),
844 otel_info,
845 }
846 }
847}
848
849impl Resource for HttpStreamReadResource {
850 fn name(&self) -> Cow<str> {
851 "httpReadStream".into()
852 }
853
854 fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
855 Box::pin(async move {
856 let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
857
858 let body = loop {
859 match &mut *rd {
860 HttpRequestReader::Headers(_) => {}
861 HttpRequestReader::Body(_, body) => break body,
862 HttpRequestReader::Closed => return Ok(BufView::empty()),
863 }
864 match take(&mut *rd) {
865 HttpRequestReader::Headers(request) => {
866 let (parts, body) = request.into_parts();
867 *rd = HttpRequestReader::Body(parts.headers, body.peekable());
868 }
869 _ => unreachable!(),
870 };
871 };
872
873 let fut = async {
874 let mut body = Pin::new(body);
875 loop {
876 match body.as_mut().peek_mut().await {
877 Some(Ok(chunk)) if !chunk.is_empty() => {
878 let len = min(limit, chunk.len());
879 let buf = chunk.split_to(len);
880 let view = BufView::from(buf);
881 break Ok(view);
882 }
883 Some(_) => match body.as_mut().next().await.unwrap() {
890 Ok(chunk) => assert!(chunk.is_empty()),
891 Err(err) => {
892 break Err(JsErrorBox::from_err(HttpError::HyperV014(
893 Arc::new(err),
894 )))
895 }
896 },
897 None => break Ok(BufView::empty()),
898 }
899 }
900 };
901
902 let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
903 fut.try_or_cancel(cancel_handle).await
904 })
905 }
906
907 fn close(self: Rc<Self>) {
908 self.cancel_handle.cancel();
909 }
910
911 fn size_hint(&self) -> (u64, Option<u64>) {
912 (self.size.lower(), self.size.upper())
913 }
914}
915
916impl HttpStreamWriteResource {
917 fn new(
918 conn: &Rc<HttpConnResource>,
919 response_tx: oneshot::Sender<Response<Body>>,
920 accept_encoding: Encoding,
921 otel_info: Option<Rc<RefCell<Option<OtelInfo>>>>,
922 ) -> Self {
923 Self {
924 conn: conn.clone(),
925 wr: HttpResponseWriter::Headers(response_tx).into(),
926 accept_encoding,
927 otel_info,
928 }
929 }
930}
931
932impl Resource for HttpStreamWriteResource {
933 fn name(&self) -> Cow<str> {
934 "httpWriteStream".into()
935 }
936}
937
938pub enum HttpRequestReader {
940 Headers(Request<Body>),
941 Body(HeaderMap<HeaderValue>, Peekable<Body>),
942 Closed,
943}
944
945impl Default for HttpRequestReader {
946 fn default() -> Self {
947 Self::Closed
948 }
949}
950
951enum HttpResponseWriter {
953 Headers(oneshot::Sender<Response<Body>>),
954 Body {
955 writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
956 shutdown_handle: ShutdownHandle,
957 },
958 BodyUncompressed(BodyUncompressedSender),
959 Closed,
960}
961
962impl Default for HttpResponseWriter {
963 fn default() -> Self {
964 Self::Closed
965 }
966}
967
968struct BodyUncompressedSender(Option<hyper_v014::body::Sender>);
969
970impl BodyUncompressedSender {
971 fn sender(&mut self) -> &mut hyper_v014::body::Sender {
972 self.0.as_mut().unwrap()
975 }
976
977 fn shutdown(mut self) {
978 self.0.take();
981 }
982}
983
984impl From<hyper_v014::body::Sender> for BodyUncompressedSender {
985 fn from(sender: hyper_v014::body::Sender) -> Self {
986 BodyUncompressedSender(Some(sender))
987 }
988}
989
990impl Drop for BodyUncompressedSender {
991 fn drop(&mut self) {
992 if let Some(sender) = self.0.take() {
993 sender.abort();
994 }
995 }
996}
997
998#[derive(Serialize)]
1000#[serde(rename_all = "camelCase")]
1001struct NextRequestResponse(
1002 ResourceId,
1004 ResourceId,
1006 String,
1010 String,
1012);
1013
1014#[op2(async)]
1015#[serde]
1016async fn op_http_accept(
1017 state: Rc<RefCell<OpState>>,
1018 #[smi] rid: ResourceId,
1019) -> Result<Option<NextRequestResponse>, HttpError> {
1020 let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
1021
1022 match conn.accept().await {
1023 Ok(Some((read_stream, write_stream, method, url))) => {
1024 let read_stream_rid = state
1025 .borrow_mut()
1026 .resource_table
1027 .add_rc(Rc::new(read_stream));
1028 let write_stream_rid = state
1029 .borrow_mut()
1030 .resource_table
1031 .add_rc(Rc::new(write_stream));
1032 let r =
1033 NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
1034 Ok(Some(r))
1035 }
1036 Ok(None) => Ok(None),
1037 Err(err) => Err(err),
1038 }
1039}
1040
1041fn req_url(
1042 req: &hyper_v014::Request<hyper_v014::Body>,
1043 scheme: &'static str,
1044 addr: &HttpSocketAddr,
1045) -> String {
1046 let host: Cow<str> = match addr {
1047 HttpSocketAddr::IpSocket(addr) => {
1048 if let Some(auth) = req.uri().authority() {
1049 match addr.port() {
1050 443 if scheme == "https" => Cow::Borrowed(auth.host()),
1051 80 if scheme == "http" => Cow::Borrowed(auth.host()),
1052 _ => Cow::Borrowed(auth.as_str()), }
1054 } else if let Some(host) = req.uri().host() {
1055 Cow::Borrowed(host)
1056 } else if let Some(host) = req.headers().get("HOST") {
1057 match host.to_str() {
1058 Ok(host) => Cow::Borrowed(host),
1059 Err(_) => Cow::Owned(
1060 host
1061 .as_bytes()
1062 .iter()
1063 .cloned()
1064 .map(char::from)
1065 .collect::<String>(),
1066 ),
1067 }
1068 } else {
1069 Cow::Owned(addr.to_string())
1070 }
1071 }
1072 #[cfg(unix)]
1076 HttpSocketAddr::UnixSocket(addr) => Cow::Owned(
1077 percent_encoding::percent_encode(
1078 addr
1079 .as_pathname()
1080 .and_then(|x| x.to_str())
1081 .unwrap_or_default()
1082 .as_bytes(),
1083 percent_encoding::NON_ALPHANUMERIC,
1084 )
1085 .to_string(),
1086 ),
1087 };
1088 let path = req
1089 .uri()
1090 .path_and_query()
1091 .map(|p| p.as_str())
1092 .unwrap_or("/");
1093 [scheme, "://", &host, path].concat()
1094}
1095
1096fn req_headers(
1097 header_map: &HeaderMap<HeaderValue>,
1098) -> Vec<(ByteString, ByteString)> {
1099 let cookie_sep = "; ".as_bytes();
1104 let mut cookies = vec![];
1105
1106 let mut headers = Vec::with_capacity(header_map.len());
1107 for (name, value) in header_map.iter() {
1108 if name == hyper_v014::header::COOKIE {
1109 cookies.push(value.as_bytes());
1110 } else {
1111 let name: &[u8] = name.as_ref();
1112 let value = value.as_bytes();
1113 headers.push((name.into(), value.into()));
1114 }
1115 }
1116
1117 if !cookies.is_empty() {
1118 headers.push(("cookie".into(), cookies.join(cookie_sep).into()));
1119 }
1120
1121 headers
1122}
1123
1124#[op2(async)]
1125async fn op_http_write_headers(
1126 state: Rc<RefCell<OpState>>,
1127 #[smi] rid: u32,
1128 #[smi] status: u16,
1129 #[serde] headers: Vec<(ByteString, ByteString)>,
1130 #[serde] data: Option<StringOrBuffer>,
1131) -> Result<(), HttpError> {
1132 let stream = state
1133 .borrow_mut()
1134 .resource_table
1135 .get::<HttpStreamWriteResource>(rid)?;
1136
1137 let encoding = stream.accept_encoding;
1139
1140 let mut builder = Response::builder();
1141 let hmap = unsafe { builder.headers_mut().unwrap_unchecked() };
1143
1144 hmap.reserve(headers.len() + 2);
1146 for (k, v) in headers.into_iter() {
1147 let v: Vec<u8> = v.into();
1148 hmap.append(
1149 HeaderName::try_from(k.as_slice())?,
1150 HeaderValue::try_from(v)?,
1151 );
1152 }
1153 ensure_vary_accept_encoding(hmap);
1154
1155 let accepts_compression =
1156 matches!(encoding, Encoding::Brotli | Encoding::Gzip);
1157 let compressing = accepts_compression
1158 && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
1159 && should_compress(hmap);
1160
1161 if compressing {
1162 weaken_etag(hmap);
1163 hmap.remove(hyper_v014::header::CONTENT_LENGTH);
1165 hmap.insert(
1167 hyper_v014::header::CONTENT_ENCODING,
1168 HeaderValue::from_static(match encoding {
1169 Encoding::Brotli => "br",
1170 Encoding::Gzip => "gzip",
1171 _ => unreachable!(), }),
1173 );
1174 }
1175
1176 let (new_wr, body) = http_response(data, compressing, encoding)?;
1177 let body = builder.status(status).body(body)?;
1178
1179 if let Some(otel) = stream.otel_info.as_ref() {
1180 let mut otel = otel.borrow_mut();
1181 if let Some(otel_info) = otel.as_mut() {
1182 otel_info.attributes.http_response_status_code = Some(status as _);
1183 otel_info.handle_duration_and_request_size();
1184 }
1185 }
1186
1187 let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1188 let response_tx = match replace(&mut *old_wr, new_wr) {
1189 HttpResponseWriter::Headers(response_tx) => response_tx,
1190 _ => return Err(HttpError::ResponseHeadersAlreadySent),
1191 };
1192
1193 match response_tx.send(body) {
1194 Ok(_) => Ok(()),
1195 Err(_) => {
1196 stream.conn.closed().await?;
1197 Err(HttpError::ConnectionClosedWhileSendingResponse)
1198 }
1199 }
1200}
1201
1202#[op2]
1203#[serde]
1204fn op_http_headers(
1205 state: &mut OpState,
1206 #[smi] rid: u32,
1207) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
1208 let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
1209 let rd = RcRef::map(&stream, |r| &r.rd)
1210 .try_borrow()
1211 .ok_or(HttpError::AlreadyInUse)
1212 .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?;
1213 match &*rd {
1214 HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
1215 HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
1216 _ => unreachable!(),
1217 }
1218}
1219
1220fn http_response(
1221 data: Option<StringOrBuffer>,
1222 compressing: bool,
1223 encoding: Encoding,
1224) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
1225 const GZIP_DEFAULT_COMPRESSION_LEVEL: u8 = 1;
1229
1230 match data {
1231 Some(data) if compressing => match encoding {
1232 Encoding::Brotli => {
1233 let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
1239 writer.write_all(&data)?;
1240 Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
1241 }
1242 Encoding::Gzip => {
1243 let mut writer = GzEncoder::new(
1244 Vec::new(),
1245 Compression::new(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1246 );
1247 writer.write_all(&data)?;
1248 Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
1249 }
1250 _ => unreachable!(), },
1252 Some(data) => {
1253 Ok((HttpResponseWriter::Closed, data.to_vec().into()))
1256 }
1257 None if compressing => {
1258 let (a, b) = tokio::io::duplex(64 * 1024);
1262 let (reader, _) = tokio::io::split(a);
1263 let (_, writer) = tokio::io::split(b);
1264 let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
1265 Encoding::Brotli => {
1266 Box::pin(BrotliEncoder::with_quality(writer, Level::Fastest))
1267 }
1268 Encoding::Gzip => Box::pin(GzipEncoder::with_quality(
1269 writer,
1270 Level::Precise(GZIP_DEFAULT_COMPRESSION_LEVEL.into()),
1271 )),
1272 _ => unreachable!(), };
1274 let (stream, shutdown_handle) =
1275 ExternallyAbortableReaderStream::new(reader);
1276 Ok((
1277 HttpResponseWriter::Body {
1278 writer,
1279 shutdown_handle,
1280 },
1281 Body::wrap_stream(stream),
1282 ))
1283 }
1284 None => {
1285 let (body_tx, body_rx) = Body::channel();
1286 Ok((
1287 HttpResponseWriter::BodyUncompressed(body_tx.into()),
1288 body_rx,
1289 ))
1290 }
1291 }
1292}
1293
1294fn weaken_etag(hmap: &mut hyper_v014::HeaderMap) {
1297 if let Some(etag) = hmap.get_mut(hyper_v014::header::ETAG) {
1298 if !etag.as_bytes().starts_with(b"W/") {
1299 let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
1300 v.extend(b"W/");
1301 v.extend(etag.as_bytes());
1302 *etag = v.try_into().unwrap();
1303 }
1304 }
1305}
1306
1307fn ensure_vary_accept_encoding(hmap: &mut hyper_v014::HeaderMap) {
1312 if let Some(v) = hmap.get_mut(hyper_v014::header::VARY) {
1313 if let Ok(s) = v.to_str() {
1314 if !s.to_lowercase().contains("accept-encoding") {
1315 *v = format!("Accept-Encoding, {s}").try_into().unwrap()
1316 }
1317 return;
1318 }
1319 }
1320 hmap.insert(
1321 hyper_v014::header::VARY,
1322 HeaderValue::from_static("Accept-Encoding"),
1323 );
1324}
1325
1326fn should_compress(headers: &hyper_v014::HeaderMap) -> bool {
1327 fn cache_control_no_transform(
1329 headers: &hyper_v014::HeaderMap,
1330 ) -> Option<bool> {
1331 let v = headers.get(hyper_v014::header::CACHE_CONTROL)?;
1332 let s = match std::str::from_utf8(v.as_bytes()) {
1333 Ok(s) => s,
1334 Err(_) => return Some(true),
1335 };
1336 let c = CacheControl::from_value(s)?;
1337 Some(c.no_transform)
1338 }
1339 let content_range = headers.contains_key(hyper_v014::header::CONTENT_RANGE);
1343 let is_precompressed =
1345 headers.contains_key(hyper_v014::header::CONTENT_ENCODING);
1346
1347 !content_range
1348 && !is_precompressed
1349 && !cache_control_no_transform(headers).unwrap_or_default()
1350 && headers
1351 .get(hyper_v014::header::CONTENT_TYPE)
1352 .map(compressible::is_content_compressible)
1353 .unwrap_or_default()
1354}
1355
1356#[op2(async)]
1357async fn op_http_write_resource(
1358 state: Rc<RefCell<OpState>>,
1359 #[smi] rid: ResourceId,
1360 #[smi] stream: ResourceId,
1361) -> Result<(), HttpError> {
1362 let http_stream = state
1363 .borrow()
1364 .resource_table
1365 .get::<HttpStreamWriteResource>(rid)?;
1366 let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
1367 let resource = state.borrow().resource_table.get_any(stream)?;
1368 loop {
1369 match *wr {
1370 HttpResponseWriter::Headers(_) => {
1371 return Err(HttpError::NoResponseHeaders)
1372 }
1373 HttpResponseWriter::Closed => {
1374 return Err(HttpError::ResponseAlreadyCompleted)
1375 }
1376 _ => {}
1377 };
1378
1379 let view = resource.clone().read(64 * 1024).await?; if view.is_empty() {
1381 break;
1382 }
1383
1384 match &mut *wr {
1385 HttpResponseWriter::Body { writer, .. } => {
1386 let mut result = writer.write_all(&view).await;
1387 if result.is_ok() {
1388 result = writer.flush().await;
1389 }
1390 if let Err(err) = result {
1391 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1392 http_stream.conn.closed().await?;
1395 *wr = HttpResponseWriter::Closed;
1397 }
1398 }
1399 HttpResponseWriter::BodyUncompressed(body) => {
1400 let bytes = view.to_vec().into();
1401 if let Err(err) = body.sender().send_data(bytes).await {
1402 assert!(err.is_closed());
1403 http_stream.conn.closed().await?;
1405 *wr = HttpResponseWriter::Closed;
1407 }
1408 }
1409 _ => unreachable!(),
1410 };
1411 }
1412 Ok(())
1413}
1414
1415#[op2(async)]
1416async fn op_http_write(
1417 state: Rc<RefCell<OpState>>,
1418 #[smi] rid: ResourceId,
1419 #[buffer] buf: JsBuffer,
1420) -> Result<(), HttpError> {
1421 let stream = state
1422 .borrow()
1423 .resource_table
1424 .get::<HttpStreamWriteResource>(rid)?;
1425 let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1426
1427 if let Some(otel) = stream.otel_info.as_ref() {
1428 let mut maybe_otel_info = otel.borrow_mut();
1429 if let Some(otel_info) = maybe_otel_info.as_mut() {
1430 if let Some(response_size) = otel_info.response_size.as_mut() {
1431 *response_size += buf.len() as u64;
1432 }
1433 }
1434 }
1435
1436 match &mut *wr {
1437 HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
1438 HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
1439 HttpResponseWriter::Body { writer, .. } => {
1440 let mut result = writer.write_all(&buf).await;
1441 if result.is_ok() {
1442 result = writer.flush().await;
1443 }
1444 match result {
1445 Ok(_) => Ok(()),
1446 Err(err) => {
1447 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1448 stream.conn.closed().await?;
1451 *wr = HttpResponseWriter::Closed;
1453 Err(HttpError::ResponseAlreadyCompleted)
1454 }
1455 }
1456 }
1457 HttpResponseWriter::BodyUncompressed(body) => {
1458 let bytes = Bytes::from(buf.to_vec());
1459 match body.sender().send_data(bytes).await {
1460 Ok(_) => Ok(()),
1461 Err(err) => {
1462 assert!(err.is_closed());
1463 stream.conn.closed().await?;
1465 *wr = HttpResponseWriter::Closed;
1467 Err(HttpError::ResponseAlreadyCompleted)
1468 }
1469 }
1470 }
1471 }
1472}
1473
1474#[op2(async)]
1478async fn op_http_shutdown(
1479 state: Rc<RefCell<OpState>>,
1480 #[smi] rid: ResourceId,
1481) -> Result<(), HttpError> {
1482 let stream = state
1483 .borrow()
1484 .resource_table
1485 .get::<HttpStreamWriteResource>(rid)?;
1486 let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
1487 let wr = take(&mut *wr);
1488 match wr {
1489 HttpResponseWriter::Body {
1490 mut writer,
1491 shutdown_handle,
1492 } => {
1493 shutdown_handle.shutdown();
1494 match writer.shutdown().await {
1495 Ok(_) => {}
1496 Err(err) => {
1497 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1498 stream.conn.closed().await?;
1501 }
1502 }
1503 }
1504 HttpResponseWriter::BodyUncompressed(body) => {
1505 body.shutdown();
1506 }
1507 _ => {}
1508 };
1509 Ok(())
1510}
1511
1512#[op2]
1513#[string]
1514fn op_http_websocket_accept_header(#[string] key: String) -> String {
1515 let digest = ring::digest::digest(
1516 &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
1517 format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
1518 );
1519 BASE64_STANDARD.encode(digest)
1520}
1521
1522#[op2(async)]
1523#[smi]
1524async fn op_http_upgrade_websocket(
1525 state: Rc<RefCell<OpState>>,
1526 #[smi] rid: ResourceId,
1527) -> Result<ResourceId, HttpError> {
1528 let stream = state
1529 .borrow_mut()
1530 .resource_table
1531 .get::<HttpStreamReadResource>(rid)?;
1532 let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
1533
1534 let request = match &mut *rd {
1535 HttpRequestReader::Headers(request) => request,
1536 _ => {
1537 return Err(HttpError::UpgradeBodyUsed)
1538 .inspect_err(|e| handle_error_otel(&stream.otel_info, e))
1539 }
1540 };
1541
1542 let (transport, bytes) = extract_network_stream(
1543 hyper_v014::upgrade::on(request)
1544 .await
1545 .map_err(|err| HttpError::HyperV014(Arc::new(err)))
1546 .inspect_err(|e| handle_error_otel(&stream.otel_info, e))?,
1547 );
1548 Ok(ws_create_server_stream(
1549 &mut state.borrow_mut(),
1550 transport,
1551 bytes,
1552 ))
1553}
1554
1555#[derive(Clone)]
1557pub struct LocalExecutor;
1558
1559impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
1560where
1561 Fut: Future + 'static,
1562 Fut::Output: 'static,
1563{
1564 fn execute(&self, fut: Fut) {
1565 deno_core::unsync::spawn(fut);
1566 }
1567}
1568
1569impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
1570where
1571 Fut: Future + 'static,
1572 Fut::Output: 'static,
1573{
1574 fn execute(&self, fut: Fut) {
1575 deno_core::unsync::spawn(fut);
1576 }
1577}
1578
1579fn filter_enotconn(
1581 result: Result<(), hyper_v014::Error>,
1582) -> Result<(), hyper_v014::Error> {
1583 if result
1584 .as_ref()
1585 .err()
1586 .and_then(|err| err.source())
1587 .and_then(|err| err.downcast_ref::<io::Error>())
1588 .filter(|err| err.kind() == io::ErrorKind::NotConnected)
1589 .is_some()
1590 {
1591 Ok(())
1592 } else {
1593 result
1594 }
1595}
1596
1597fn never() -> Pending<Never> {
1599 pending()
1600}
1601
1602trait CanDowncastUpgrade: Sized {
1603 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1604 self,
1605 ) -> Result<(T, Bytes), Self>;
1606}
1607
1608impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
1609 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1610 self,
1611 ) -> Result<(T, Bytes), Self> {
1612 let hyper::upgrade::Parts { io, read_buf, .. } =
1613 self.downcast::<TokioIo<T>>()?;
1614 Ok((io.into_inner(), read_buf))
1615 }
1616}
1617
1618impl CanDowncastUpgrade for hyper_v014::upgrade::Upgraded {
1619 fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
1620 self,
1621 ) -> Result<(T, Bytes), Self> {
1622 let hyper_v014::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
1623 Ok((io, read_buf))
1624 }
1625}
1626
1627fn maybe_extract_network_stream<
1628 T: Into<NetworkStream> + AsyncRead + AsyncWrite + Unpin + 'static,
1629 U: CanDowncastUpgrade,
1630>(
1631 upgraded: U,
1632) -> Result<(NetworkStream, Bytes), U> {
1633 let upgraded = match upgraded.downcast::<T>() {
1634 Ok((stream, bytes)) => return Ok((stream.into(), bytes)),
1635 Err(x) => x,
1636 };
1637
1638 match upgraded.downcast::<NetworkBufferedStream<T>>() {
1639 Ok((stream, upgraded_bytes)) => {
1640 let (io, stream_bytes) = stream.into_inner();
1642 let bytes = match (stream_bytes.is_empty(), upgraded_bytes.is_empty()) {
1643 (false, false) => Bytes::default(),
1644 (true, false) => upgraded_bytes,
1645 (false, true) => stream_bytes,
1646 (true, true) => {
1647 let mut v = upgraded_bytes.to_vec();
1649 v.append(&mut stream_bytes.to_vec());
1650 Bytes::from(v)
1651 }
1652 };
1653 Ok((io.into(), bytes))
1654 }
1655 Err(x) => Err(x),
1656 }
1657}
1658
1659fn extract_network_stream<U: CanDowncastUpgrade>(
1660 upgraded: U,
1661) -> (NetworkStream, Bytes) {
1662 let upgraded =
1663 match maybe_extract_network_stream::<tokio::net::TcpStream, _>(upgraded) {
1664 Ok(res) => return res,
1665 Err(x) => x,
1666 };
1667 let upgraded =
1668 match maybe_extract_network_stream::<deno_net::ops_tls::TlsStream, _>(
1669 upgraded,
1670 ) {
1671 Ok(res) => return res,
1672 Err(x) => x,
1673 };
1674 #[cfg(unix)]
1675 let upgraded =
1676 match maybe_extract_network_stream::<tokio::net::UnixStream, _>(upgraded) {
1677 Ok(res) => return res,
1678 Err(x) => x,
1679 };
1680 let upgraded =
1681 match maybe_extract_network_stream::<NetworkStream, _>(upgraded) {
1682 Ok(res) => return res,
1683 Err(x) => x,
1684 };
1685
1686 drop(upgraded);
1688 unreachable!("unexpected stream type");
1689}