1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use http::{HeaderMap, HeaderName, HeaderValue, Response, StatusCode};
5use http_body_util::combinators::UnsyncBoxBody;
6use http_body_util::{BodyExt as _, Full};
7use hyper::server::conn::http1;
8use pin_project_lite::pin_project;
9use std::convert::Infallible;
10use std::ffi::OsString;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::{
15 path::PathBuf,
16 sync::{
17 Arc, Mutex,
18 atomic::{AtomicBool, AtomicU64, Ordering},
19 },
20 time::{Duration, Instant},
21};
22use tokio::io::{self, AsyncWrite};
23use tokio::sync::{Notify, Semaphore};
24use wasmtime::component::{Component, GuestTaskId, Linker};
25use wasmtime::error::Context as _;
26use wasmtime::{
27 AsContextMut as _, Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail,
28};
29use wasmtime_cli_flags::opt::WasmtimeOptionValue;
30use wasmtime_wasi::p2::{StreamError, StreamResult};
31use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
32use wasmtime_wasi_http::WasiHttpCtx;
33use wasmtime_wasi_http::handler::{
34 self, HandlerState, Instance, Prepared, Proxy, ProxyHandler, ProxyPre, ShouldAccept, ViewFn,
35 WorkerExpiration, WorkerState, WorkerStatus,
36};
37use wasmtime_wasi_http::io::TokioIo;
38
39#[cfg(feature = "debug")]
40use crate::commands::run::RunCommand;
41
42#[cfg(feature = "wasi-config")]
43use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
44#[cfg(feature = "wasi-keyvalue")]
45use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
46#[cfg(feature = "wasi-nn")]
47use wasmtime_wasi_nn::wit::WasiNnCtx;
48
49const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
50const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
51const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
52
53struct Host {
54 table: wasmtime::component::ResourceTable,
55 ctx: WasiCtx,
56 http: WasiHttpCtx,
57 hooks: HttpHooks,
58
59 limits: StoreLimits,
60
61 #[cfg(feature = "wasi-nn")]
62 nn: Option<WasiNnCtx>,
63
64 #[cfg(feature = "wasi-config")]
65 wasi_config: Option<WasiConfigVariables>,
66
67 #[cfg(feature = "wasi-keyvalue")]
68 wasi_keyvalue: Option<WasiKeyValueCtx>,
69
70 #[cfg(feature = "profiling")]
71 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
72
73 write_profile: Option<WriteProfile>,
74}
75
76impl WasiView for Host {
77 fn ctx(&mut self) -> WasiCtxView<'_> {
78 WasiCtxView {
79 ctx: &mut self.ctx,
80 table: &mut self.table,
81 }
82 }
83}
84
85impl wasmtime_wasi_http::p2::WasiHttpView for Host {
86 fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
87 wasmtime_wasi_http::p2::WasiHttpCtxView {
88 ctx: &mut self.http,
89 table: &mut self.table,
90 hooks: &mut self.hooks,
91 }
92 }
93}
94
95#[cfg(feature = "component-model-async")]
96impl wasmtime_wasi_http::p3::WasiHttpView for Host {
97 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
98 wasmtime_wasi_http::p3::WasiHttpCtxView {
99 table: &mut self.table,
100 ctx: &mut self.http,
101 hooks: &mut self.hooks,
102 }
103 }
104}
105
106const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
107 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
108 8080,
109);
110
111fn parse_duration(s: &str) -> Result<Duration, String> {
112 Duration::parse(Some(s)).map_err(|e| e.to_string())
113}
114
115#[derive(Parser)]
117pub struct ServeCommand {
118 #[command(flatten)]
119 run: RunCommon,
120
121 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
123 addr: SocketAddr,
124
125 #[arg(long, value_name = "SOCKADDR")]
130 shutdown_addr: Option<SocketAddr>,
131
132 #[arg(long)]
135 no_logging_prefix: bool,
136
137 #[arg(value_name = "WASM", required = true)]
139 component: PathBuf,
140
141 #[arg(long)]
146 max_instance_reuse_count: Option<usize>,
147
148 #[arg(long)]
155 max_instance_concurrent_reuse_count: Option<usize>,
156
157 #[arg(long, default_value = "1s", value_parser = parse_duration)]
164 idle_instance_timeout: Duration,
165
166 #[arg(short = 'H', long = "header", value_name = "HEADER")]
172 headers: Vec<String>,
173
174 #[arg(long)]
177 max_concurrent_requests: Option<usize>,
178 #[arg(long)]
181 max_concurrent_connections: Option<usize>,
182}
183
184impl ServeCommand {
185 pub fn execute(mut self) -> Result<()> {
187 self.run.common.init_logging()?;
188
189 if self.run.common.wasi.nn == Some(true) {
193 #[cfg(not(feature = "wasi-nn"))]
194 {
195 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
196 }
197 }
198
199 if self.run.common.wasi.threads == Some(true) {
200 bail!("wasi-threads does not support components yet")
201 }
202
203 if self.run.common.wasi.http.replace(true) == Some(false) {
206 bail!("wasi-http is required for the serve command, and must not be disabled");
207 }
208 if self.run.common.wasm.component_model.replace(true) == Some(false) {
209 bail!("components are required for the serve command, and must not be disabled");
210 }
211
212 let runtime = tokio::runtime::Builder::new_multi_thread()
213 .enable_time()
214 .enable_io()
215 .build()?;
216
217 runtime.block_on(self.serve())?;
218
219 Ok(())
220 }
221
222 #[cfg(feature = "debug")]
225 fn debugger_setup(&mut self) -> Result<Option<RunCommand>> {
226 fn set_implicit_option(
227 place: &str,
228 name: &str,
229 setting: &mut Option<bool>,
230 value: bool,
231 ) -> Result<()> {
232 if *setting == Some(!value) {
233 bail!(
234 "Explicitly-set option on {place} {name}={} is not compatible \
235 with debugging-implied setting {value}",
236 setting.unwrap()
237 );
238 }
239 *setting = Some(value);
240 Ok(())
241 }
242
243 #[cfg(feature = "gdbstub")]
244 let override_bytes = if let Some(addr) = self.run.gdbstub.as_deref() {
245 if self.run.common.debug.debugger.is_some() {
246 bail!("-g/--gdb cannot be combined with -Ddebugger=");
247 }
248 let addr = if addr.parse::<u16>().is_ok() {
249 format!("127.0.0.1:{addr}")
250 } else {
251 use std::net::SocketAddr as SA;
252 addr.parse::<SA>()
253 .with_context(|| format!("invalid gdbstub address: `{addr}`"))?;
254 addr.to_string()
255 };
256 self.run.common.debug.debugger = Some("<built-in gdbstub>".into());
257 self.run.common.debug.arg.push(addr);
258 Some(gdbstub_component_artifact::GDBSTUB_COMPONENT)
259 } else {
260 None
261 };
262 #[cfg(not(feature = "gdbstub"))]
263 let override_bytes = None;
264
265 if let Some(debugger_component_path) = self.run.common.debug.debugger.as_ref() {
266 set_implicit_option(
267 "debuggee",
268 "guest_debug",
269 &mut self.run.common.debug.guest_debug,
270 true,
271 )?;
272 set_implicit_option(
273 "debuggee",
274 "epoch_interruption",
275 &mut self.run.common.wasm.epoch_interruption,
276 true,
277 )?;
278
279 let mut debugger_run = RunCommand::try_parse_from(
280 ["run".into(), debugger_component_path.into()]
281 .into_iter()
282 .chain(self.run.common.debug.arg.iter().map(OsString::from)),
283 )?;
284 debugger_run.module_bytes = override_bytes;
285
286 debugger_run.run.common.wasi.tcp.get_or_insert(true);
287 debugger_run
288 .run
289 .common
290 .wasi
291 .inherit_network
292 .get_or_insert(true);
293
294 set_implicit_option(
295 "debugger",
296 "inherit_stdin",
297 &mut debugger_run.run.common.wasi.inherit_stdin,
298 self.run.common.debug.inherit_stdin.unwrap_or(false),
299 )?;
300 set_implicit_option(
301 "debugger",
302 "inherit_stdout",
303 &mut debugger_run.run.common.wasi.inherit_stdout,
304 self.run.common.debug.inherit_stdout.unwrap_or(false),
305 )?;
306 set_implicit_option(
307 "debugger",
308 "inherit_stderr",
309 &mut debugger_run.run.common.wasi.inherit_stderr,
310 self.run.common.debug.inherit_stderr.unwrap_or(false),
311 )?;
312 Ok(Some(debugger_run))
313 } else {
314 Ok(None)
315 }
316 }
317
318 #[cfg(feature = "debug")]
323 async fn serve_under_debugger(
324 self,
325 mut debug_run: RunCommand,
326 linker: Linker<Host>,
327 component: Component,
328 ) -> Result<()> {
329 let mut debuggee_store = self.new_store(linker.engine(), None)?;
330
331 debuggee_store.debug_register_component(&component)?;
334
335 let debug_engine = debug_run.new_engine()?;
336 let debug_main = debug_run.run.load_module(
337 &debug_engine,
338 debug_run.module_and_args[0].as_ref(),
339 debug_run.module_bytes.as_ref().map(|v| &v[..]),
340 )?;
341 let (mut debug_store, debug_linker) =
342 debug_run.new_store_and_linker(&debug_engine, &debug_main)?;
343 let debug_component = match debug_main {
344 RunTarget::Core(_) => {
345 bail!("Debugger component is a core module; only components are supported")
346 }
347 RunTarget::Component(c) => c,
348 };
349 let mut debug_linker = match debug_linker {
350 crate::commands::run::CliLinker::Core(_) => unreachable!(),
351 crate::commands::run::CliLinker::Component(l) => l,
352 };
353 debug_run.add_debugger_api(&mut debug_linker)?;
354
355 debug_run
356 .invoke_debugger(
357 &mut debug_store,
358 &debug_component,
359 &mut debug_linker,
360 debuggee_store,
361 move |store| Box::pin(self.serve_maybe_debug(linker, component, Some(store))),
362 )
363 .await
364 }
365
366 fn new_store(&self, engine: &Engine, instance_id: Option<u64>) -> Result<Store<Host>> {
367 let mut builder = WasiCtxBuilder::new();
368 self.run.configure_wasip2(&mut builder)?;
369
370 if let Some(instance_id) = instance_id {
371 builder.env("INSTANCE_ID", instance_id.to_string());
372 }
373
374 let stdout_prefix: String;
375 let stderr_prefix: String;
376 match instance_id {
377 Some(instance_id) if !self.no_logging_prefix => {
378 stdout_prefix = format!("stdout [{instance_id}] :: ");
379 stderr_prefix = format!("stderr [{instance_id}] :: ");
380 }
381 _ => {
382 stdout_prefix = "".to_string();
383 stderr_prefix = "".to_string();
384 }
385 }
386 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
387 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
388
389 let mut table = wasmtime::component::ResourceTable::new();
390 if let Some(max) = self.run.common.wasi.max_resources {
391 table.set_max_capacity(max);
392 }
393 let mut host = Host {
394 table,
395 ctx: builder.build(),
396 http: self.run.wasi_http_ctx()?,
397 hooks: self.run.wasi_http_hooks(),
398
399 limits: StoreLimits::default(),
400
401 #[cfg(feature = "wasi-nn")]
402 nn: None,
403 #[cfg(feature = "wasi-config")]
404 wasi_config: None,
405 #[cfg(feature = "wasi-keyvalue")]
406 wasi_keyvalue: None,
407 #[cfg(feature = "profiling")]
408 guest_profiler: None,
409 write_profile: None,
410 };
411
412 if self.run.common.wasi.nn == Some(true) {
413 #[cfg(feature = "wasi-nn")]
414 {
415 let graphs = self
416 .run
417 .common
418 .wasi
419 .nn_graph
420 .iter()
421 .map(|g| (g.format.clone(), g.dir.clone()))
422 .collect::<Vec<_>>();
423 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
424 host.nn.replace(WasiNnCtx::new(backends, registry));
425 }
426 }
427
428 if self.run.common.wasi.config == Some(true) {
429 #[cfg(feature = "wasi-config")]
430 {
431 let vars = WasiConfigVariables::from_iter(
432 self.run
433 .common
434 .wasi
435 .config_var
436 .iter()
437 .map(|v| (v.key.clone(), v.value.clone())),
438 );
439 host.wasi_config.replace(vars);
440 }
441 }
442
443 if self.run.common.wasi.keyvalue == Some(true) {
444 #[cfg(feature = "wasi-keyvalue")]
445 {
446 let ctx = WasiKeyValueCtxBuilder::new()
447 .in_memory_data(
448 self.run
449 .common
450 .wasi
451 .keyvalue_in_memory_data
452 .iter()
453 .map(|v| (v.key.clone(), v.value.clone())),
454 )
455 .build();
456 host.wasi_keyvalue.replace(ctx);
457 }
458 }
459
460 let mut store = Store::new(engine, host);
461
462 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
463 store.set_hostcall_fuel(fuel);
464 }
465
466 store.data_mut().limits = self.run.store_limits();
467 store.limiter(|t| &mut t.limits);
468
469 if let Some(fuel) = self.run.common.wasm.fuel {
472 store.set_fuel(fuel)?;
473 }
474
475 Ok(store)
476 }
477
478 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
479 self.run.validate_p3_option()?;
480 let cli = self.run.validate_cli_enabled()?;
481
482 if cli == Some(true) {
491 self.run.add_wasmtime_wasi_to_linker(linker)?;
492 wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
493 #[cfg(feature = "component-model-async")]
494 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
495 wasmtime_wasi_http::p3::add_to_linker(linker)?;
496 }
497 } else {
498 wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
499 #[cfg(feature = "component-model-async")]
500 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
501 wasmtime_wasi_http::p3::add_to_linker(linker)?;
502 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
503 wasmtime_wasi::p3::random::add_to_linker(linker)?;
504 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
505 }
506 }
507
508 if self.run.common.wasi.nn == Some(true) {
509 #[cfg(not(feature = "wasi-nn"))]
510 {
511 bail!("support for wasi-nn was disabled at compile time");
512 }
513 #[cfg(feature = "wasi-nn")]
514 {
515 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
516 let ctx = h.nn.as_mut().unwrap();
517 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
518 })?;
519 }
520 }
521
522 if self.run.common.wasi.config == Some(true) {
523 #[cfg(not(feature = "wasi-config"))]
524 {
525 bail!("support for wasi-config was disabled at compile time");
526 }
527 #[cfg(feature = "wasi-config")]
528 {
529 wasmtime_wasi_config::add_to_linker(linker, |h| {
530 WasiConfig::from(h.wasi_config.as_ref().unwrap())
531 })?;
532 }
533 }
534
535 if self.run.common.wasi.keyvalue == Some(true) {
536 #[cfg(not(feature = "wasi-keyvalue"))]
537 {
538 bail!("support for wasi-keyvalue was disabled at compile time");
539 }
540 #[cfg(feature = "wasi-keyvalue")]
541 {
542 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
543 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
544 })?;
545 }
546 }
547
548 if self.run.common.wasi.threads == Some(true) {
549 bail!("support for wasi-threads is not available with components");
550 }
551
552 if self.run.common.wasi.http == Some(false) {
553 bail!("support for wasi-http must be enabled for `serve` subcommand");
554 }
555
556 Ok(())
557 }
558
559 async fn serve(mut self) -> Result<()> {
560 #[cfg(feature = "debug")]
561 let debug_run = self.debugger_setup()?;
562
563 let mut config = self
564 .run
565 .common
566 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
567 config.wasm_component_model(true);
568
569 if self.run.common.wasm.timeout.is_some() {
570 config.epoch_interruption(true);
571 }
572
573 match self.run.profile {
574 Some(Profile::Native(s)) => {
575 config.profiler(s);
576 }
577 Some(Profile::Guest { .. }) => {
578 config.epoch_interruption(true);
579 }
580 None => {}
581 }
582
583 let engine = Engine::new(&config)?;
584 let mut linker = Linker::new(&engine);
585
586 self.add_to_linker(&mut linker)?;
587
588 let component = match self.run.load_module(&engine, &self.component, None)? {
589 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
590 RunTarget::Component(c) => c,
591 };
592
593 #[cfg(feature = "debug")]
594 if let Some(debug_run) = debug_run {
595 return self
596 .serve_under_debugger(debug_run, linker, component)
597 .await;
598 }
599
600 self.serve_maybe_debug(linker, component, None).await
601 }
602
603 async fn serve_maybe_debug(
604 self,
605 linker: Linker<Host>,
606 component: Component,
607 mut debuggee_store: Option<&mut Store<Host>>,
608 ) -> Result<()> {
609 let engine = linker.engine();
610 let request_headers = RequestHeaders::parse(&self.headers)?;
611 let instance = linker.instantiate_pre(&component)?;
612 #[cfg(feature = "component-model-async")]
613 let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
614 Ok(pre) => ProxyPre::P3(pre),
615 Err(_) => ProxyPre::P2(wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance)?),
616 };
617 #[cfg(not(feature = "component-model-async"))]
618 let instance = ProxyPre::P2(wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance)?);
619
620 let shutdown = Arc::new(GracefulShutdown::default());
624 tokio::task::spawn({
625 let shutdown = shutdown.clone();
626 async move {
627 tokio::signal::ctrl_c().await.unwrap();
628 shutdown.requested.notify_one();
629 }
630 });
631 if let Some(addr) = self.shutdown_addr {
632 let listener = tokio::net::TcpListener::bind(addr).await?;
633 eprintln!(
634 "Listening for shutdown on tcp://{}/",
635 listener.local_addr()?
636 );
637 let shutdown = shutdown.clone();
638 tokio::task::spawn(async move {
639 let _ = listener.accept().await;
640 shutdown.requested.notify_one();
641 });
642 }
643
644 let socket = match &self.addr {
645 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
646 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
647 };
648 socket.set_reuseaddr(!cfg!(windows))?;
657 socket.bind(self.addr)?;
658 let listener = socket.listen(100)?;
659
660 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
661
662 log::info!("Listening on {}", self.addr);
663
664 let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
665 Some(interval)
666 } else if let Some(t) = self.run.common.wasm.timeout {
667 Some(EPOCH_INTERRUPT_PERIOD.min(t))
668 } else if debuggee_store.is_some() {
669 Some(Duration::from_millis(1))
670 } else {
671 None
672 };
673 let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
674
675 let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
676 if let ProxyPre::P3(_) = &instance {
677 DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
678 } else {
679 DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
680 }
681 });
682
683 let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
684 self.max_instance_concurrent_reuse_count
685 .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
686 } else {
687 1
688 };
689
690 let max_concurrent_connections = self
691 .max_concurrent_connections
692 .unwrap_or(if debuggee_store.is_some() { 1 } else { 1000 });
693 let max_concurrent_requests = self
694 .max_concurrent_requests
695 .unwrap_or(if debuggee_store.is_some() { 1 } else { 1000 });
696 if debuggee_store.is_some() && max_concurrent_connections != 1 {
697 bail!("cannot have more than 1 max concurrent connections with a debugger");
698 }
699 if debuggee_store.is_some() && max_concurrent_requests != 1 {
700 bail!("cannot have more than 1 max concurrent requests with a debugger");
701 }
702
703 let sem_connections = Arc::new(Semaphore::new(max_concurrent_connections));
704
705 let handler = ProxyHandler::new(HostHandlerState {
706 sem_requests: Semaphore::new(max_concurrent_requests),
707 cmd: self,
708 component,
709 request_headers,
710 max_instance_reuse_count,
711 max_instance_concurrent_reuse_count,
712 instance,
713 next_instance_id: AtomicU64::default(),
714 next_request_id: AtomicU64::default(),
715 _shutdown_guard: Box::new(shutdown.clone().increment()),
718 });
719
720 loop {
721 let (connection_permit, stream) = tokio::select! {
725 _ = shutdown.requested.notified() => break,
726 v = async {
727 let permit = sem_connections.clone().acquire_owned().await?;
728 let (stream, _) = listener.accept().await?;
729 wasmtime::error::Ok((permit, stream))
730 } => v?,
731 };
732
733 stream.set_nodelay(true)?;
739
740 let shutdown_guard = shutdown.clone().increment();
744
745 match &mut debuggee_store {
749 Some(store) => {
750 handle_client(stream, &handler, Some(store)).await;
751 }
752 None => {
753 let handler = handler.clone();
754 tokio::task::spawn(async move {
755 handle_client(stream, &handler, None).await;
756 drop(shutdown_guard);
757 drop(connection_permit);
758 });
759 }
760 }
761 }
762
763 handler.state().sem_requests.close();
765
766 drop(handler);
767
768 if shutdown.close() {
774 return Ok(());
775 }
776 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
777 tokio::select! {
778 _ = tokio::signal::ctrl_c() => {}
779 _ = shutdown.complete.notified() => {}
780 }
781
782 Ok(())
783 }
784}
785
786pin_project! {
787 struct HostWorkerExpiration {
788 idle_timeout: Duration,
789 request_timeout: Duration,
790 #[pin]
791 sleep: tokio::time::Sleep,
792 }
793}
794
795impl WorkerExpiration for HostWorkerExpiration {
796 fn poll(
797 self: Pin<&mut Self>,
798 cx: &mut Context<'_>,
799 status: WorkerStatus,
800 start: Instant,
801 ) -> Poll<()> {
802 let mut me = self.project();
803
804 let timeout = match status {
805 WorkerStatus::Idle => *me.idle_timeout,
806 WorkerStatus::Requests | WorkerStatus::PostReturn => *me.request_timeout,
810 };
811
812 if let Some(deadline) = start.checked_add(timeout) {
813 let deadline = deadline.into();
814 if deadline != me.sleep.deadline() {
815 me.sleep.as_mut().reset(deadline);
816 }
817 me.sleep.poll(cx)
818 } else {
819 Poll::Pending
820 }
821 }
822}
823
824struct HostWorkerState {
825 instance_id: u64,
826 max_instance_reuse_count: usize,
827 max_instance_concurrent_reuse_count: usize,
828 request_timeout: Duration,
829}
830
831impl WorkerState for HostWorkerState {
832 type StoreData = Host;
833 type RequestId = u64;
834
835 fn should_accept_request(&self, concurrent_count: usize, total_count: usize) -> ShouldAccept {
836 if total_count >= self.max_instance_reuse_count {
837 ShouldAccept::Never
838 } else if concurrent_count >= self.max_instance_concurrent_reuse_count {
839 ShouldAccept::No
840 } else {
841 ShouldAccept::Yes
842 }
843 }
844
845 fn on_request_start(
846 &self,
847 _store: StoreContextMut<Host>,
848 request_id: u64,
849 _task_id: GuestTaskId,
850 ) -> Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>> {
851 log::info!(
852 "Instance {} handling request {request_id}",
853 self.instance_id,
854 );
855
856 Box::pin(tokio::time::sleep(self.request_timeout))
857 }
858
859 fn drop(&self, mut store: Store<Self::StoreData>, result: Result<(), wasmtime::Error>) {
860 if let Err(error) = result {
861 eprintln!("worker failed: {error:?}");
862 }
863
864 if let Some(write_profile) = store.data_mut().write_profile.take() {
865 write_profile(store.as_context_mut());
866 }
867
868 drop(store);
869 }
870}
871
872struct HostHandlerState {
873 cmd: ServeCommand,
874 component: Component,
875 request_headers: RequestHeaders,
876 max_instance_reuse_count: usize,
877 max_instance_concurrent_reuse_count: usize,
878 instance: ProxyPre<Host>,
879 next_instance_id: AtomicU64,
880 next_request_id: AtomicU64,
881 sem_requests: Semaphore,
882 _shutdown_guard: Box<dyn std::any::Any + Send + Sync>,
883}
884
885impl HostHandlerState {
886 async fn instantiate_into(&self, store: &mut Store<Host>) -> Result<Proxy> {
887 let write_profile = setup_epoch_handler(&self.cmd, &mut *store, self.component.clone())?;
888 store.data_mut().write_profile = Some(write_profile);
889 self.instance.instantiate_async(&mut *store).await
890 }
891
892 fn view(&self) -> ViewFn<Host> {
893 match &self.instance {
894 ProxyPre::P2(_) => ViewFn::P2(wasmtime_wasi_http::p2::WasiHttpView::http),
895 ProxyPre::P3(_) => ViewFn::P3(wasmtime_wasi_http::p3::WasiHttpView::http),
896 }
897 }
898}
899
900impl HandlerState for HostHandlerState {
901 type StoreData = Host;
902 type WorkerExpiration = HostWorkerExpiration;
903 type WorkerState = HostWorkerState;
904
905 async fn instantiate(
906 &self,
907 ) -> Result<Instance<Self::StoreData, Self::WorkerExpiration, Self::WorkerState>> {
908 let instance_id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
909 let mut store = self
910 .cmd
911 .new_store(self.component.engine(), Some(instance_id))?;
912 let proxy = self.instantiate_into(&mut store).await?;
913
914 Ok(Instance {
915 store,
916 proxy,
917 view: self.view(),
918 expiration: HostWorkerExpiration {
919 idle_timeout: self.cmd.idle_instance_timeout,
920 request_timeout: self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX),
921 sleep: tokio::time::sleep(Duration::MAX),
922 },
923 state: HostWorkerState {
924 max_instance_reuse_count: self.max_instance_reuse_count,
925 max_instance_concurrent_reuse_count: self.max_instance_concurrent_reuse_count,
926 instance_id,
927 request_timeout: self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX),
928 },
929 })
930 }
931}
932
933#[derive(Default)]
935struct GracefulShutdown {
936 requested: Notify,
938 complete: Notify,
941 state: Mutex<GracefulShutdownState>,
943}
944
945#[derive(Default)]
946struct GracefulShutdownState {
947 active_tasks: u32,
948 notify_when_done: bool,
949}
950
951impl GracefulShutdown {
952 fn increment(self: Arc<Self>) -> impl Drop + Send + Sync {
954 struct Guard(Arc<GracefulShutdown>);
955
956 let mut state = self.state.lock().unwrap();
957 assert!(!state.notify_when_done);
958 state.active_tasks += 1;
959 drop(state);
960
961 return Guard(self);
962
963 impl Drop for Guard {
964 fn drop(&mut self) {
965 let mut state = self.0.state.lock().unwrap();
966 state.active_tasks -= 1;
967 if state.notify_when_done && state.active_tasks == 0 {
968 self.0.complete.notify_one();
969 }
970 }
971 }
972 }
973
974 fn close(&self) -> bool {
977 let mut state = self.state.lock().unwrap();
978 state.notify_when_done = true;
979 state.active_tasks == 0
980 }
981}
982
983const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
987
988struct EpochThread {
989 shutdown: Arc<AtomicBool>,
990 handle: Option<std::thread::JoinHandle<()>>,
991}
992
993impl EpochThread {
994 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
995 let shutdown = Arc::new(AtomicBool::new(false));
996 let handle = {
997 let shutdown = Arc::clone(&shutdown);
998 let handle = std::thread::spawn(move || {
999 while !shutdown.load(Ordering::Relaxed) {
1000 std::thread::sleep(interval);
1001 engine.increment_epoch();
1002 }
1003 });
1004 Some(handle)
1005 };
1006
1007 EpochThread { shutdown, handle }
1008 }
1009}
1010
1011impl Drop for EpochThread {
1012 fn drop(&mut self) {
1013 if let Some(handle) = self.handle.take() {
1014 self.shutdown.store(true, Ordering::Relaxed);
1015 handle.join().unwrap();
1016 }
1017 }
1018}
1019
1020type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
1021
1022fn setup_epoch_handler(
1023 cmd: &ServeCommand,
1024 store: &mut Store<Host>,
1025 component: Component,
1026) -> Result<WriteProfile> {
1027 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
1029 #[cfg(feature = "profiling")]
1030 return setup_guest_profiler(store, path.clone(), *interval, component.clone());
1031 #[cfg(not(feature = "profiling"))]
1032 {
1033 let _ = (path, interval);
1034 bail!("support for profiling disabled at compile time!");
1035 }
1036 }
1037
1038 if cmd.run.common.wasm.timeout.is_some() || cmd.run.common.debug.debugger.is_some() {
1040 store.epoch_deadline_async_yield_and_update(1);
1041 }
1042
1043 Ok(Box::new(|_store| {}))
1044}
1045
1046#[cfg(feature = "profiling")]
1047fn setup_guest_profiler(
1048 store: &mut Store<Host>,
1049 path: String,
1050 interval: Duration,
1051 component: Component,
1052) -> Result<WriteProfile> {
1053 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
1054
1055 let module_name = "<main>";
1056
1057 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
1058 store.engine(),
1059 module_name,
1060 interval,
1061 component,
1062 std::iter::empty(),
1063 )?));
1064
1065 fn sample(
1066 mut store: StoreContextMut<Host>,
1067 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
1068 ) {
1069 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
1070 f(
1071 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
1072 store.as_context(),
1073 );
1074 store.data_mut().guest_profiler = Some(profiler);
1075 }
1076
1077 store.call_hook(|store, kind| {
1079 sample(store, |profiler, store| profiler.call_hook(store, kind));
1080 Ok(())
1081 });
1082
1083 store.epoch_deadline_callback(move |store| {
1084 sample(store, |profiler, store| {
1085 profiler.sample(store, std::time::Duration::ZERO)
1086 });
1087
1088 Ok(UpdateDeadline::Continue(1))
1089 });
1090
1091 store.set_epoch_deadline(1);
1092
1093 let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
1094 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
1095 .expect("profiling doesn't support threads yet");
1096 if let Err(e) = std::fs::File::create(&path)
1097 .map_err(wasmtime::Error::new)
1098 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
1099 {
1100 eprintln!("failed writing profile at {path}: {e:#}");
1101 } else {
1102 eprintln!();
1103 eprintln!("Profile written to: {path}");
1104 eprintln!("View this profile at https://profiler.firefox.com/.");
1105 }
1106 });
1107
1108 Ok(write_profile)
1109}
1110
1111type Request = hyper::Request<hyper::body::Incoming>;
1112
1113async fn handle_client(
1114 client: tokio::net::TcpStream,
1115 handler: &ProxyHandler<HostHandlerState>,
1116 debuggee_store: Option<&mut Store<Host>>,
1117) {
1118 let lock = &debuggee_store.map(tokio::sync::Mutex::new);
1123
1124 if let Err(e) = http1::Builder::new()
1125 .keep_alive(true)
1126 .serve_connection(
1127 TokioIo::new(client),
1128 hyper::service::service_fn(move |req| async move {
1129 let mut debuggee_store = match &lock {
1130 Some(store) => Some(store.lock().await),
1131 None => None,
1132 };
1133 let debuggee_store = debuggee_store.as_mut().map(|s| &mut ***s);
1134 match handle_request(handler, debuggee_store, req).await {
1135 Ok(r) => Ok::<_, Infallible>(r),
1136 Err(e) => {
1137 eprintln!("error: {e:?}");
1138 let error_html = "\
1139<!doctype html>
1140<html>
1141<head>
1142 <title>500 Internal Server Error</title>
1143</head>
1144<body>
1145 <center>
1146 <h1>500 Internal Server Error</h1>
1147 <hr>
1148 wasmtime
1149 </center>
1150</body>
1151</html>";
1152 Ok(Response::builder()
1153 .status(StatusCode::INTERNAL_SERVER_ERROR)
1154 .header("Content-Type", "text/html; charset=UTF-8")
1155 .body(
1156 Full::new(bytes::Bytes::from(error_html))
1157 .map_err(|_| unreachable!())
1158 .boxed_unsync(),
1159 )
1160 .unwrap())
1161 }
1162 }
1163 }),
1164 )
1165 .await
1166 {
1167 eprintln!("error: {e:?}");
1168 }
1169}
1170
1171async fn handle_request(
1172 handler: &ProxyHandler<HostHandlerState>,
1173 debuggee_store: Option<&mut Store<Host>>,
1174 mut req: Request,
1175) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
1176 use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode;
1177
1178 let _request_permit = handler.state().sem_requests.acquire().await?;
1182
1183 handler.state().request_headers.apply(req.headers_mut());
1184
1185 let request_id = handler
1186 .state()
1187 .next_request_id
1188 .fetch_add(1, Ordering::Relaxed);
1189 log::info!(
1190 "Received request {request_id}: {} {}",
1191 req.method(),
1192 req.uri()
1193 );
1194
1195 let req = req.map(|body| {
1196 body.map_err(ErrorCode::from_hyper_request_error)
1197 .map_err(handler::ErrorCode::from)
1198 .boxed_unsync()
1199 });
1200
1201 match debuggee_store {
1202 Some(store) => {
1206 let instance = handler.state().instantiate_into(store).await?;
1207 let (tx, rx) = futures::channel::oneshot::channel();
1208 let prepared = Prepared::new(
1209 store.as_context_mut(),
1210 &instance,
1211 req,
1212 handler.state().view(),
1213 tx,
1214 )?;
1215 store
1216 .run_concurrent(async |store| prepared.run(store, std::future::pending()).await)
1217 .await??;
1218 rx.await?
1219 }
1220
1221 None => handler.handle(request_id, req).await,
1223 }
1224}
1225
1226#[derive(Clone, Default)]
1227struct RequestHeaders {
1228 entries: Vec<(HeaderName, HeaderValue)>,
1229}
1230
1231impl RequestHeaders {
1232 fn parse(headers: &[String]) -> Result<Self> {
1233 let mut entries = Vec::new();
1234 for header in headers {
1235 if let Some(path) = header.strip_prefix('@') {
1236 let contents = std::fs::read_to_string(path)
1237 .with_context(|| format!("failed to read header file `{path}`"))?;
1238 for line in contents.lines().filter(|line| !line.trim().is_empty()) {
1239 entries.push(parse_header(line)?);
1240 }
1241 } else {
1242 entries.push(parse_header(header)?);
1243 }
1244 }
1245 Ok(Self { entries })
1246 }
1247
1248 fn apply(&self, headers: &mut HeaderMap) {
1249 for name in self.entries.iter().map(|(name, _)| name) {
1252 headers.remove(name);
1253 }
1254 for (name, value) in &self.entries {
1255 headers.append(name, value.clone());
1256 }
1257 }
1258}
1259
1260fn parse_header(header: &str) -> Result<(HeaderName, HeaderValue)> {
1261 let (name, value) = header
1262 .split_once(':')
1263 .with_context(|| format!("header `{header}` is missing `:`"))?;
1264 let name = HeaderName::from_bytes(name.trim().as_bytes())
1265 .with_context(|| format!("invalid header name in header `{header}`"))?;
1266 let value = HeaderValue::from_str(value.trim_start())
1267 .with_context(|| format!("invalid header value in header `{header}`"))?;
1268 Ok((name, value))
1269}
1270
1271#[derive(Clone)]
1272enum Output {
1273 Stdout,
1274 Stderr,
1275}
1276
1277impl Output {
1278 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
1279 use std::io::Write;
1280
1281 match self {
1282 Output::Stdout => std::io::stdout().write_all(buf),
1283 Output::Stderr => std::io::stderr().write_all(buf),
1284 }
1285 }
1286}
1287
1288#[derive(Clone)]
1289struct LogStream {
1290 output: Output,
1291 state: Arc<LogStreamState>,
1292}
1293
1294struct LogStreamState {
1295 prefix: String,
1296 needs_prefix_on_next_write: AtomicBool,
1297}
1298
1299impl LogStream {
1300 fn new(prefix: String, output: Output) -> LogStream {
1301 LogStream {
1302 output,
1303 state: Arc::new(LogStreamState {
1304 prefix,
1305 needs_prefix_on_next_write: AtomicBool::new(true),
1306 }),
1307 }
1308 }
1309
1310 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1311 while !bytes.is_empty() {
1312 if self
1313 .state
1314 .needs_prefix_on_next_write
1315 .load(Ordering::Relaxed)
1316 {
1317 self.output.write_all(self.state.prefix.as_bytes())?;
1318 self.state
1319 .needs_prefix_on_next_write
1320 .store(false, Ordering::Relaxed);
1321 }
1322 match bytes.iter().position(|b| *b == b'\n') {
1323 Some(i) => {
1324 let (a, b) = bytes.split_at(i + 1);
1325 bytes = b;
1326 self.output.write_all(a)?;
1327 self.state
1328 .needs_prefix_on_next_write
1329 .store(true, Ordering::Relaxed);
1330 }
1331 None => {
1332 self.output.write_all(bytes)?;
1333 break;
1334 }
1335 }
1336 }
1337
1338 Ok(())
1339 }
1340}
1341
1342impl wasmtime_wasi::cli::StdoutStream for LogStream {
1343 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1344 Box::new(self.clone())
1345 }
1346 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1347 Box::new(self.clone())
1348 }
1349}
1350
1351impl wasmtime_wasi::cli::IsTerminal for LogStream {
1352 fn is_terminal(&self) -> bool {
1353 match &self.output {
1354 Output::Stdout => std::io::stdout().is_terminal(),
1355 Output::Stderr => std::io::stderr().is_terminal(),
1356 }
1357 }
1358}
1359
1360impl wasmtime_wasi::p2::OutputStream for LogStream {
1361 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1362 self.write_all(&bytes)
1363 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1364 Ok(())
1365 }
1366
1367 fn flush(&mut self) -> StreamResult<()> {
1368 Ok(())
1369 }
1370
1371 fn check_write(&mut self) -> StreamResult<usize> {
1372 Ok(1024 * 1024)
1373 }
1374}
1375
1376#[async_trait::async_trait]
1377impl wasmtime_wasi::p2::Pollable for LogStream {
1378 async fn ready(&mut self) {}
1379}
1380
1381impl AsyncWrite for LogStream {
1382 fn poll_write(
1383 mut self: Pin<&mut Self>,
1384 _cx: &mut Context<'_>,
1385 buf: &[u8],
1386 ) -> Poll<io::Result<usize>> {
1387 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1388 }
1389 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1390 Poll::Ready(Ok(()))
1391 }
1392 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1393 Poll::Ready(Ok(()))
1394 }
1395}
1396
1397fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1419 use wasmtime::{Config, Memory, MemoryType};
1420 const BITS_TO_TEST: u32 = 42;
1421 let mut config = Config::new();
1422 config.wasm_memory64(true);
1423 config.memory_reservation(1 << BITS_TO_TEST);
1424 let engine = Engine::new(&config)?;
1425 let mut store = Store::new(&engine, ());
1426 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1429 if Memory::new(&mut store, ty).is_ok() {
1430 Ok(Some(true))
1431 } else {
1432 Ok(None)
1433 }
1434}