1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use futures::future::FutureExt;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::UnsyncBoxBody;
8use hyper::body::{Body, Frame, SizeHint};
9use std::convert::Infallible;
10use std::ffi::OsString;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::{
15 path::PathBuf,
16 sync::{
17 Arc, Mutex,
18 atomic::{AtomicBool, Ordering},
19 },
20 time::Duration,
21};
22use tokio::io::{self, AsyncWrite};
23use tokio::sync::Notify;
24use wasmtime::component::{Component, Linker};
25use wasmtime::{
26 Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail, error::Context as _,
27};
28use wasmtime_cli_flags::opt::WasmtimeOptionValue;
29use wasmtime_wasi::p2::{StreamError, StreamResult};
30use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
31#[cfg(feature = "component-model-async")]
32use wasmtime_wasi_http::handler::p2::bindings as p2;
33use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
34use wasmtime_wasi_http::io::TokioIo;
35use wasmtime_wasi_http::{WasiHttpCtx, p2::WasiHttpView};
36
37#[cfg(feature = "debug")]
38use crate::commands::run::RunCommand;
39
40#[cfg(feature = "wasi-config")]
41use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
42#[cfg(feature = "wasi-keyvalue")]
43use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
44#[cfg(feature = "wasi-nn")]
45use wasmtime_wasi_nn::wit::WasiNnCtx;
46
47const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
48const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
49const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
50
51struct Host {
52 table: wasmtime::component::ResourceTable,
53 ctx: WasiCtx,
54 http: WasiHttpCtx,
55 hooks: HttpHooks,
56
57 limits: StoreLimits,
58
59 #[cfg(feature = "wasi-nn")]
60 nn: Option<WasiNnCtx>,
61
62 #[cfg(feature = "wasi-config")]
63 wasi_config: Option<WasiConfigVariables>,
64
65 #[cfg(feature = "wasi-keyvalue")]
66 wasi_keyvalue: Option<WasiKeyValueCtx>,
67
68 #[cfg(feature = "profiling")]
69 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
70}
71
72impl WasiView for Host {
73 fn ctx(&mut self) -> WasiCtxView<'_> {
74 WasiCtxView {
75 ctx: &mut self.ctx,
76 table: &mut self.table,
77 }
78 }
79}
80
81impl wasmtime_wasi_http::p2::WasiHttpView for Host {
82 fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
83 wasmtime_wasi_http::p2::WasiHttpCtxView {
84 ctx: &mut self.http,
85 table: &mut self.table,
86 hooks: &mut self.hooks,
87 }
88 }
89}
90
91#[cfg(feature = "component-model-async")]
92impl wasmtime_wasi_http::p3::WasiHttpView for Host {
93 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
94 wasmtime_wasi_http::p3::WasiHttpCtxView {
95 table: &mut self.table,
96 ctx: &mut self.http,
97 hooks: &mut self.hooks,
98 }
99 }
100}
101
102const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
103 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
104 8080,
105);
106
107fn parse_duration(s: &str) -> Result<Duration, String> {
108 Duration::parse(Some(s)).map_err(|e| e.to_string())
109}
110
111#[derive(Parser)]
113pub struct ServeCommand {
114 #[command(flatten)]
115 run: RunCommon,
116
117 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
119 addr: SocketAddr,
120
121 #[arg(long, value_name = "SOCKADDR")]
126 shutdown_addr: Option<SocketAddr>,
127
128 #[arg(long)]
131 no_logging_prefix: bool,
132
133 #[arg(value_name = "WASM", required = true)]
135 component: PathBuf,
136
137 #[arg(long)]
142 max_instance_reuse_count: Option<usize>,
143
144 #[arg(long)]
151 max_instance_concurrent_reuse_count: Option<usize>,
152
153 #[arg(long, default_value = "1s", value_parser = parse_duration)]
160 idle_instance_timeout: Duration,
161}
162
163impl ServeCommand {
164 pub fn execute(mut self) -> Result<()> {
166 self.run.common.init_logging()?;
167
168 if self.run.common.wasi.nn == Some(true) {
172 #[cfg(not(feature = "wasi-nn"))]
173 {
174 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
175 }
176 }
177
178 if self.run.common.wasi.threads == Some(true) {
179 bail!("wasi-threads does not support components yet")
180 }
181
182 if self.run.common.wasi.http.replace(true) == Some(false) {
185 bail!("wasi-http is required for the serve command, and must not be disabled");
186 }
187 if self.run.common.wasm.component_model.replace(true) == Some(false) {
188 bail!("components are required for the serve command, and must not be disabled");
189 }
190
191 let runtime = tokio::runtime::Builder::new_multi_thread()
192 .enable_time()
193 .enable_io()
194 .build()?;
195
196 runtime.block_on(self.serve())?;
197
198 Ok(())
199 }
200
201 #[cfg(feature = "debug")]
204 fn debugger_setup(&mut self) -> Result<Option<RunCommand>> {
205 fn set_implicit_option(
206 place: &str,
207 name: &str,
208 setting: &mut Option<bool>,
209 value: bool,
210 ) -> Result<()> {
211 if *setting == Some(!value) {
212 bail!(
213 "Explicitly-set option on {place} {name}={} is not compatible \
214 with debugging-implied setting {value}",
215 setting.unwrap()
216 );
217 }
218 *setting = Some(value);
219 Ok(())
220 }
221
222 #[cfg(feature = "gdbstub")]
223 let override_bytes = if let Some(addr) = self.run.gdbstub.as_deref() {
224 if self.run.common.debug.debugger.is_some() {
225 bail!("-g/--gdb cannot be combined with -Ddebugger=");
226 }
227 let addr = if addr.parse::<u16>().is_ok() {
228 format!("127.0.0.1:{addr}")
229 } else {
230 use std::net::SocketAddr as SA;
231 addr.parse::<SA>()
232 .with_context(|| format!("invalid gdbstub address: `{addr}`"))?;
233 addr.to_string()
234 };
235 self.run.common.debug.debugger = Some("<built-in gdbstub>".into());
236 self.run.common.debug.arg.push(addr);
237 Some(gdbstub_component_artifact::GDBSTUB_COMPONENT)
238 } else {
239 None
240 };
241 #[cfg(not(feature = "gdbstub"))]
242 let override_bytes = None;
243
244 if let Some(debugger_component_path) = self.run.common.debug.debugger.as_ref() {
245 set_implicit_option(
246 "debuggee",
247 "guest_debug",
248 &mut self.run.common.debug.guest_debug,
249 true,
250 )?;
251 set_implicit_option(
252 "debuggee",
253 "epoch_interruption",
254 &mut self.run.common.wasm.epoch_interruption,
255 true,
256 )?;
257
258 let mut debugger_run = RunCommand::try_parse_from(
259 ["run".into(), debugger_component_path.into()]
260 .into_iter()
261 .chain(self.run.common.debug.arg.iter().map(OsString::from)),
262 )?;
263 debugger_run.module_bytes = override_bytes;
264
265 debugger_run.run.common.wasi.tcp.get_or_insert(true);
266 debugger_run
267 .run
268 .common
269 .wasi
270 .inherit_network
271 .get_or_insert(true);
272
273 set_implicit_option(
274 "debugger",
275 "inherit_stdin",
276 &mut debugger_run.run.common.wasi.inherit_stdin,
277 self.run.common.debug.inherit_stdin.unwrap_or(false),
278 )?;
279 set_implicit_option(
280 "debugger",
281 "inherit_stdout",
282 &mut debugger_run.run.common.wasi.inherit_stdout,
283 self.run.common.debug.inherit_stdout.unwrap_or(false),
284 )?;
285 set_implicit_option(
286 "debugger",
287 "inherit_stderr",
288 &mut debugger_run.run.common.wasi.inherit_stderr,
289 self.run.common.debug.inherit_stderr.unwrap_or(false),
290 )?;
291 Ok(Some(debugger_run))
292 } else {
293 Ok(None)
294 }
295 }
296
297 #[cfg(feature = "debug")]
302 async fn serve_under_debugger(
303 &self,
304 mut debug_run: RunCommand,
305 engine: &Engine,
306 linker: &Linker<Host>,
307 component: &Component,
308 ) -> Result<()> {
309 let instance_pre = linker.instantiate_pre(component)?;
310 let proxy_pre = wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance_pre)?;
311
312 let mut debuggee_store = self.new_store(engine, None)?;
313
314 debuggee_store.debug_register_component(component)?;
317
318 let debug_engine = debug_run.new_engine()?;
319 let debug_main = debug_run.run.load_module(
320 &debug_engine,
321 debug_run.module_and_args[0].as_ref(),
322 debug_run.module_bytes.as_ref().map(|v| &v[..]),
323 )?;
324 let (mut debug_store, debug_linker) =
325 debug_run.new_store_and_linker(&debug_engine, &debug_main)?;
326 let debug_component = match debug_main {
327 RunTarget::Core(_) => {
328 bail!("Debugger component is a core module; only components are supported")
329 }
330 RunTarget::Component(c) => c,
331 };
332 let mut debug_linker = match debug_linker {
333 crate::commands::run::CliLinker::Core(_) => unreachable!(),
334 crate::commands::run::CliLinker::Component(l) => l,
335 };
336 debug_run.add_debugger_api(&mut debug_linker)?;
337
338 let addr = self.addr;
339 debug_run
340 .invoke_debugger(
341 &mut debug_store,
342 &debug_component,
343 &mut debug_linker,
344 debuggee_store,
345 move |store| Box::pin(debug_serve_body(store, proxy_pre, addr)),
346 )
347 .await
348 }
349
350 fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
351 let mut builder = WasiCtxBuilder::new();
352 self.run.configure_wasip2(&mut builder)?;
353
354 if let Some(req_id) = req_id {
355 builder.env("REQUEST_ID", req_id.to_string());
356 }
357
358 let stdout_prefix: String;
359 let stderr_prefix: String;
360 match req_id {
361 Some(req_id) if !self.no_logging_prefix => {
362 stdout_prefix = format!("stdout [{req_id}] :: ");
363 stderr_prefix = format!("stderr [{req_id}] :: ");
364 }
365 _ => {
366 stdout_prefix = "".to_string();
367 stderr_prefix = "".to_string();
368 }
369 }
370 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
371 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
372
373 let mut table = wasmtime::component::ResourceTable::new();
374 if let Some(max) = self.run.common.wasi.max_resources {
375 table.set_max_capacity(max);
376 }
377 let mut host = Host {
378 table,
379 ctx: builder.build(),
380 http: self.run.wasi_http_ctx()?,
381 hooks: self.run.wasi_http_hooks(),
382
383 limits: StoreLimits::default(),
384
385 #[cfg(feature = "wasi-nn")]
386 nn: None,
387 #[cfg(feature = "wasi-config")]
388 wasi_config: None,
389 #[cfg(feature = "wasi-keyvalue")]
390 wasi_keyvalue: None,
391 #[cfg(feature = "profiling")]
392 guest_profiler: None,
393 };
394
395 if self.run.common.wasi.nn == Some(true) {
396 #[cfg(feature = "wasi-nn")]
397 {
398 let graphs = self
399 .run
400 .common
401 .wasi
402 .nn_graph
403 .iter()
404 .map(|g| (g.format.clone(), g.dir.clone()))
405 .collect::<Vec<_>>();
406 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
407 host.nn.replace(WasiNnCtx::new(backends, registry));
408 }
409 }
410
411 if self.run.common.wasi.config == Some(true) {
412 #[cfg(feature = "wasi-config")]
413 {
414 let vars = WasiConfigVariables::from_iter(
415 self.run
416 .common
417 .wasi
418 .config_var
419 .iter()
420 .map(|v| (v.key.clone(), v.value.clone())),
421 );
422 host.wasi_config.replace(vars);
423 }
424 }
425
426 if self.run.common.wasi.keyvalue == Some(true) {
427 #[cfg(feature = "wasi-keyvalue")]
428 {
429 let ctx = WasiKeyValueCtxBuilder::new()
430 .in_memory_data(
431 self.run
432 .common
433 .wasi
434 .keyvalue_in_memory_data
435 .iter()
436 .map(|v| (v.key.clone(), v.value.clone())),
437 )
438 .build();
439 host.wasi_keyvalue.replace(ctx);
440 }
441 }
442
443 let mut store = Store::new(engine, host);
444
445 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
446 store.set_hostcall_fuel(fuel);
447 }
448
449 store.data_mut().limits = self.run.store_limits();
450 store.limiter(|t| &mut t.limits);
451
452 if let Some(fuel) = self.run.common.wasm.fuel {
455 store.set_fuel(fuel)?;
456 }
457
458 Ok(store)
459 }
460
461 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
462 self.run.validate_p3_option()?;
463 let cli = self.run.validate_cli_enabled()?;
464
465 if cli == Some(true) {
474 self.run.add_wasmtime_wasi_to_linker(linker)?;
475 wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
476 #[cfg(feature = "component-model-async")]
477 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
478 wasmtime_wasi_http::p3::add_to_linker(linker)?;
479 }
480 } else {
481 wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
482 #[cfg(feature = "component-model-async")]
483 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
484 wasmtime_wasi_http::p3::add_to_linker(linker)?;
485 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
486 wasmtime_wasi::p3::random::add_to_linker(linker)?;
487 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
488 }
489 }
490
491 if self.run.common.wasi.nn == Some(true) {
492 #[cfg(not(feature = "wasi-nn"))]
493 {
494 bail!("support for wasi-nn was disabled at compile time");
495 }
496 #[cfg(feature = "wasi-nn")]
497 {
498 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
499 let ctx = h.nn.as_mut().unwrap();
500 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
501 })?;
502 }
503 }
504
505 if self.run.common.wasi.config == Some(true) {
506 #[cfg(not(feature = "wasi-config"))]
507 {
508 bail!("support for wasi-config was disabled at compile time");
509 }
510 #[cfg(feature = "wasi-config")]
511 {
512 wasmtime_wasi_config::add_to_linker(linker, |h| {
513 WasiConfig::from(h.wasi_config.as_ref().unwrap())
514 })?;
515 }
516 }
517
518 if self.run.common.wasi.keyvalue == Some(true) {
519 #[cfg(not(feature = "wasi-keyvalue"))]
520 {
521 bail!("support for wasi-keyvalue was disabled at compile time");
522 }
523 #[cfg(feature = "wasi-keyvalue")]
524 {
525 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
526 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
527 })?;
528 }
529 }
530
531 if self.run.common.wasi.threads == Some(true) {
532 bail!("support for wasi-threads is not available with components");
533 }
534
535 if self.run.common.wasi.http == Some(false) {
536 bail!("support for wasi-http must be enabled for `serve` subcommand");
537 }
538
539 Ok(())
540 }
541
542 async fn serve(mut self) -> Result<()> {
543 use hyper::server::conn::http1;
544
545 #[cfg(feature = "debug")]
546 let debug_run = self.debugger_setup()?;
547
548 let mut config = self
549 .run
550 .common
551 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
552 config.wasm_component_model(true);
553
554 if self.run.common.wasm.timeout.is_some() {
555 config.epoch_interruption(true);
556 }
557
558 match self.run.profile {
559 Some(Profile::Native(s)) => {
560 config.profiler(s);
561 }
562 Some(Profile::Guest { .. }) => {
563 config.epoch_interruption(true);
564 }
565 None => {}
566 }
567
568 let engine = Engine::new(&config)?;
569 let mut linker = Linker::new(&engine);
570
571 self.add_to_linker(&mut linker)?;
572
573 let component = match self.run.load_module(&engine, &self.component, None)? {
574 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
575 RunTarget::Component(c) => c,
576 };
577
578 #[cfg(feature = "debug")]
579 if let Some(debug_run) = debug_run {
580 return self
581 .serve_under_debugger(debug_run, &engine, &linker, &component)
582 .await;
583 }
584
585 let instance = linker.instantiate_pre(&component)?;
586 #[cfg(feature = "component-model-async")]
587 let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
588 Ok(pre) => ProxyPre::P3(pre),
589 Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
590 };
591 #[cfg(not(feature = "component-model-async"))]
592 let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
593
594 let shutdown = Arc::new(GracefulShutdown::default());
598 tokio::task::spawn({
599 let shutdown = shutdown.clone();
600 async move {
601 tokio::signal::ctrl_c().await.unwrap();
602 shutdown.requested.notify_one();
603 }
604 });
605 if let Some(addr) = self.shutdown_addr {
606 let listener = tokio::net::TcpListener::bind(addr).await?;
607 eprintln!(
608 "Listening for shutdown on tcp://{}/",
609 listener.local_addr()?
610 );
611 let shutdown = shutdown.clone();
612 tokio::task::spawn(async move {
613 let _ = listener.accept().await;
614 shutdown.requested.notify_one();
615 });
616 }
617
618 let socket = match &self.addr {
619 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
620 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
621 };
622 socket.set_reuseaddr(!cfg!(windows))?;
631 socket.bind(self.addr)?;
632 let listener = socket.listen(100)?;
633
634 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
635
636 log::info!("Listening on {}", self.addr);
637
638 let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
639 Some(interval)
640 } else if let Some(t) = self.run.common.wasm.timeout {
641 Some(EPOCH_INTERRUPT_PERIOD.min(t))
642 } else {
643 None
644 };
645 let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
646
647 let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
648 if let ProxyPre::P3(_) = &instance {
649 DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
650 } else {
651 DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
652 }
653 });
654
655 let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
656 self.max_instance_concurrent_reuse_count
657 .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
658 } else {
659 1
660 };
661
662 let handler = ProxyHandler::new(
663 HostHandlerState {
664 cmd: self,
665 engine,
666 component,
667 max_instance_reuse_count,
668 max_instance_concurrent_reuse_count,
669 _shutdown_guard: Box::new(shutdown.clone().increment()),
672 },
673 instance,
674 );
675
676 loop {
677 let (stream, _) = tokio::select! {
681 _ = shutdown.requested.notified() => break,
682 v = listener.accept() => v?,
683 };
684
685 stream.set_nodelay(true)?;
691
692 let stream = TokioIo::new(stream);
693 let h = handler.clone();
694
695 let shutdown_guard = shutdown.clone().increment();
699 tokio::task::spawn(async move {
700 if let Err(e) = http1::Builder::new()
701 .keep_alive(true)
702 .serve_connection(
703 stream,
704 hyper::service::service_fn(move |req| {
705 let h = h.clone();
706 async move {
707 use http_body_util::{BodyExt, Full};
708 match handle_request(h, req).await {
709 Ok(r) => Ok::<_, Infallible>(r),
710 Err(e) => {
711 eprintln!("error: {e:?}");
712 let error_html = "\
713<!doctype html>
714<html>
715<head>
716 <title>500 Internal Server Error</title>
717</head>
718<body>
719 <center>
720 <h1>500 Internal Server Error</h1>
721 <hr>
722 wasmtime
723 </center>
724</body>
725</html>";
726 Ok(Response::builder()
727 .status(StatusCode::INTERNAL_SERVER_ERROR)
728 .header("Content-Type", "text/html; charset=UTF-8")
729 .body(
730 Full::new(bytes::Bytes::from(error_html))
731 .map_err(|_| unreachable!())
732 .boxed_unsync(),
733 )
734 .unwrap())
735 }
736 }
737 }
738 }),
739 )
740 .await
741 {
742 eprintln!("error: {e:?}");
743 }
744 drop(shutdown_guard);
745 });
746 }
747
748 drop(handler);
749
750 if shutdown.close() {
756 return Ok(());
757 }
758 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
759 tokio::select! {
760 _ = tokio::signal::ctrl_c() => {}
761 _ = shutdown.complete.notified() => {}
762 }
763
764 Ok(())
765 }
766}
767
768struct HostHandlerState {
769 cmd: ServeCommand,
770 engine: Engine,
771 component: Component,
772 max_instance_reuse_count: usize,
773 max_instance_concurrent_reuse_count: usize,
774 _shutdown_guard: Box<dyn std::any::Any + Send + Sync>,
775}
776
777impl HandlerState for HostHandlerState {
778 type StoreData = Host;
779
780 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
781 let mut store = self.cmd.new_store(&self.engine, req_id)?;
782 let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
783
784 Ok(StoreBundle {
785 store,
786 write_profile,
787 })
788 }
789
790 fn request_timeout(&self) -> Duration {
791 self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
792 }
793
794 fn idle_instance_timeout(&self) -> Duration {
795 self.cmd.idle_instance_timeout
796 }
797
798 fn max_instance_reuse_count(&self) -> usize {
799 self.max_instance_reuse_count
800 }
801
802 fn max_instance_concurrent_reuse_count(&self) -> usize {
803 self.max_instance_concurrent_reuse_count
804 }
805
806 fn handle_worker_error(&self, error: wasmtime::Error) {
807 eprintln!("worker error: {error}");
808 }
809}
810
811#[derive(Default)]
813struct GracefulShutdown {
814 requested: Notify,
816 complete: Notify,
819 state: Mutex<GracefulShutdownState>,
821}
822
823#[derive(Default)]
824struct GracefulShutdownState {
825 active_tasks: u32,
826 notify_when_done: bool,
827}
828
829impl GracefulShutdown {
830 fn increment(self: Arc<Self>) -> impl Drop + Send + Sync {
832 struct Guard(Arc<GracefulShutdown>);
833
834 let mut state = self.state.lock().unwrap();
835 assert!(!state.notify_when_done);
836 state.active_tasks += 1;
837 drop(state);
838
839 return Guard(self);
840
841 impl Drop for Guard {
842 fn drop(&mut self) {
843 let mut state = self.0.state.lock().unwrap();
844 state.active_tasks -= 1;
845 if state.notify_when_done && state.active_tasks == 0 {
846 self.0.complete.notify_one();
847 }
848 }
849 }
850 }
851
852 fn close(&self) -> bool {
855 let mut state = self.state.lock().unwrap();
856 state.notify_when_done = true;
857 state.active_tasks == 0
858 }
859}
860
861const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
865
866struct EpochThread {
867 shutdown: Arc<AtomicBool>,
868 handle: Option<std::thread::JoinHandle<()>>,
869}
870
871impl EpochThread {
872 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
873 let shutdown = Arc::new(AtomicBool::new(false));
874 let handle = {
875 let shutdown = Arc::clone(&shutdown);
876 let handle = std::thread::spawn(move || {
877 while !shutdown.load(Ordering::Relaxed) {
878 std::thread::sleep(interval);
879 engine.increment_epoch();
880 }
881 });
882 Some(handle)
883 };
884
885 EpochThread { shutdown, handle }
886 }
887}
888
889impl Drop for EpochThread {
890 fn drop(&mut self) {
891 if let Some(handle) = self.handle.take() {
892 self.shutdown.store(true, Ordering::Relaxed);
893 handle.join().unwrap();
894 }
895 }
896}
897
898type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
899
900fn setup_epoch_handler(
901 cmd: &ServeCommand,
902 store: &mut Store<Host>,
903 component: Component,
904) -> Result<WriteProfile> {
905 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
907 #[cfg(feature = "profiling")]
908 return setup_guest_profiler(store, path.clone(), *interval, component.clone());
909 #[cfg(not(feature = "profiling"))]
910 {
911 let _ = (path, interval);
912 bail!("support for profiling disabled at compile time!");
913 }
914 }
915
916 if cmd.run.common.wasm.timeout.is_some() {
918 store.epoch_deadline_async_yield_and_update(1);
919 }
920
921 Ok(Box::new(|_store| {}))
922}
923
924#[cfg(feature = "profiling")]
925fn setup_guest_profiler(
926 store: &mut Store<Host>,
927 path: String,
928 interval: Duration,
929 component: Component,
930) -> Result<WriteProfile> {
931 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
932
933 let module_name = "<main>";
934
935 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
936 store.engine(),
937 module_name,
938 interval,
939 component,
940 std::iter::empty(),
941 )?));
942
943 fn sample(
944 mut store: StoreContextMut<Host>,
945 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
946 ) {
947 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
948 f(
949 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
950 store.as_context(),
951 );
952 store.data_mut().guest_profiler = Some(profiler);
953 }
954
955 store.call_hook(|store, kind| {
957 sample(store, |profiler, store| profiler.call_hook(store, kind));
958 Ok(())
959 });
960
961 store.epoch_deadline_callback(move |store| {
962 sample(store, |profiler, store| {
963 profiler.sample(store, std::time::Duration::ZERO)
964 });
965
966 Ok(UpdateDeadline::Continue(1))
967 });
968
969 store.set_epoch_deadline(1);
970
971 let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
972 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
973 .expect("profiling doesn't support threads yet");
974 if let Err(e) = std::fs::File::create(&path)
975 .map_err(wasmtime::Error::new)
976 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
977 {
978 eprintln!("failed writing profile at {path}: {e:#}");
979 } else {
980 eprintln!();
981 eprintln!("Profile written to: {path}");
982 eprintln!("View this profile at https://profiler.firefox.com/.");
983 }
984 });
985
986 Ok(write_profile)
987}
988
989fn error_response(status: StatusCode) -> hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>> {
991 Response::builder()
992 .status(status)
993 .body(
994 http_body_util::Empty::new()
995 .map_err(|_| unreachable!())
996 .boxed_unsync(),
997 )
998 .unwrap()
999}
1000
1001#[cfg(feature = "debug")]
1004async fn debug_serve_body(
1005 store: &mut Store<Host>,
1006 proxy_pre: wasmtime_wasi_http::p2::bindings::ProxyPre<Host>,
1007 addr: SocketAddr,
1008) -> Result<()> {
1009 use hyper::server::conn::http1;
1010 use wasmtime_wasi_http::p2::bindings::http::types::Scheme;
1011 use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
1012
1013 type P2Response = std::result::Result<
1014 hyper::Response<HyperOutgoingBody>,
1015 wasmtime_wasi_http::p2::bindings::http::types::ErrorCode,
1016 >;
1017
1018 let engine_clone = store.engine().clone();
1019 let _epoch_thread = std::thread::spawn(move || {
1020 loop {
1021 std::thread::sleep(Duration::from_millis(1));
1022 engine_clone.increment_epoch();
1023 }
1024 });
1025
1026 store.epoch_deadline_async_yield_and_update(1);
1027
1028 let proxy = proxy_pre.instantiate_async(&mut *store).await?;
1030
1031 let socket = match addr {
1033 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
1034 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
1035 };
1036 socket.set_reuseaddr(!cfg!(windows))?;
1037 socket.bind(addr)?;
1038 let listener = socket.listen(100)?;
1039 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
1040
1041 loop {
1043 let (stream, _) = listener.accept().await?;
1044 stream.set_nodelay(true)?;
1045 let stream = TokioIo::new(stream);
1046
1047 type RespBody = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
1050 let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::<(
1051 hyper::Request<hyper::body::Incoming>,
1052 tokio::sync::oneshot::Sender<std::result::Result<RespBody, Infallible>>,
1053 )>(1);
1054
1055 let serve_conn = http1::Builder::new().keep_alive(true).serve_connection(
1056 stream,
1057 hyper::service::service_fn(move |req| {
1058 let req_tx = req_tx.clone();
1059 async move {
1060 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
1061 if req_tx.send((req, resp_tx)).await.is_err() {
1062 return Ok::<_, Infallible>(error_response(
1063 StatusCode::SERVICE_UNAVAILABLE,
1064 ));
1065 }
1066 resp_rx
1067 .await
1068 .unwrap_or(Ok(error_response(StatusCode::SERVICE_UNAVAILABLE)))
1069 }
1070 }),
1071 );
1072
1073 tokio::pin!(serve_conn);
1074
1075 loop {
1076 tokio::select! {
1077 result = &mut serve_conn => {
1078 if let Err(e) = result {
1079 eprintln!("connection error: {e:?}");
1080 }
1081 break;
1082 }
1083 msg = req_rx.recv() => {
1084 let Some((req, resp_tx)) = msg else { break };
1085
1086 let (p2_tx, p2_rx) = tokio::sync::oneshot::channel::<P2Response>();
1087 let wasi_req = store
1088 .data_mut()
1089 .http()
1090 .new_incoming_request(Scheme::Http, req);
1091 let wasi_out = wasi_req.and_then(|_req| {
1092 let out = store.data_mut().http().new_response_outparam(p2_tx);
1093 out.map(|out| (_req, out))
1094 });
1095 let (wasi_req, wasi_out) = match wasi_out {
1096 Ok(pair) => pair,
1097 Err(e) => {
1098 eprintln!("error creating WASI request: {e:?}");
1099 let _ = resp_tx.send(Ok(error_response(
1100 StatusCode::INTERNAL_SERVER_ERROR,
1101 )));
1102 continue;
1103 }
1104 };
1105
1106 if let Err(e) = proxy
1107 .wasi_http_incoming_handler()
1108 .call_handle(&mut *store, wasi_req, wasi_out)
1109 .await
1110 {
1111 eprintln!("handler error: {e:?}");
1112 }
1113
1114 let resp = match p2_rx.await {
1115 Ok(Ok(resp)) => resp.map(|body| {
1116 body.map_err(|e| e.into()).boxed_unsync()
1117 }),
1118 Ok(Err(e)) => {
1119 eprintln!("component error: {e:?}");
1120 error_response(StatusCode::INTERNAL_SERVER_ERROR)
1121 }
1122 Err(_) => error_response(StatusCode::INTERNAL_SERVER_ERROR),
1123 };
1124 let _ = resp_tx.send(Ok(resp));
1125 }
1126 }
1127 }
1128 }
1129}
1130
1131type Request = hyper::Request<hyper::body::Incoming>;
1132
1133async fn handle_request(
1134 handler: ProxyHandler<HostHandlerState>,
1135 req: Request,
1136) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
1137 use tokio::sync::oneshot;
1138
1139 let req_id = handler.next_req_id();
1140
1141 log::info!(
1142 "Request {req_id} handling {} to {}",
1143 req.method(),
1144 req.uri()
1145 );
1146
1147 type P2Response = Result<
1154 hyper::Response<wasmtime_wasi_http::p2::body::HyperOutgoingBody>,
1155 p2::http::types::ErrorCode,
1156 >;
1157 type P3Response = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
1158
1159 enum Sender {
1160 P2(oneshot::Sender<P2Response>),
1161 P3(oneshot::Sender<P3Response>),
1162 }
1163
1164 enum Receiver {
1165 P2(oneshot::Receiver<P2Response>),
1166 P3(oneshot::Receiver<P3Response>),
1167 }
1168
1169 let (tx, rx) = match handler.instance_pre() {
1170 ProxyPre::P2(_) => {
1171 let (tx, rx) = oneshot::channel();
1172 (Sender::P2(tx), Receiver::P2(rx))
1173 }
1174 ProxyPre::P3(_) => {
1175 let (tx, rx) = oneshot::channel();
1176 (Sender::P3(tx), Receiver::P3(rx))
1177 }
1178 };
1179
1180 handler.spawn(
1181 if handler.state().max_instance_reuse_count() == 1 {
1182 Some(req_id)
1183 } else {
1184 None
1185 },
1186 Box::new(move |store, proxy| {
1187 Box::pin(
1188 async move {
1189 match proxy {
1190 Proxy::P2(proxy) => {
1191 let Sender::P2(tx) = tx else { unreachable!() };
1192 let (req, out) = store.with(move |mut store| {
1193 let req = store
1194 .data_mut()
1195 .http()
1196 .new_incoming_request(p2::http::types::Scheme::Http, req)?;
1197 let out = store.data_mut().http().new_response_outparam(tx)?;
1198 wasmtime::error::Ok((req, out))
1199 })?;
1200
1201 proxy
1202 .wasi_http_incoming_handler()
1203 .call_handle(store, req, out)
1204 .await
1205 }
1206 Proxy::P3(proxy) => {
1207 use wasmtime_wasi_http::p3::bindings::http::types::{
1208 ErrorCode, Request,
1209 };
1210
1211 let Sender::P3(tx) = tx else { unreachable!() };
1212 let (req, body) = req.into_parts();
1213 let body = body.map_err(ErrorCode::from_hyper_request_error);
1214 let req = http::Request::from_parts(req, body);
1215 let (request, request_io_result) = Request::from_http(req);
1216 let res = proxy.handle(store, request).await??;
1217 let res = store
1218 .with(|mut store| res.into_http(&mut store, request_io_result))?;
1219
1220 let (resp_body_tx, resp_body_rx) = oneshot::channel();
1228 let res = res.map(|body| {
1229 let body = body.map_err(|e| e.into());
1230 P3BodyWrapper {
1231 _tx: resp_body_tx,
1232 body,
1233 }
1234 .boxed_unsync()
1235 });
1236
1237 if tx.send(res).is_ok() {
1242 _ = resp_body_rx.await;
1243 }
1244
1245 Ok(())
1246 }
1247 }
1248 }
1249 .map(move |result| {
1250 if let Err(error) = result {
1251 eprintln!("[{req_id}] :: {error:?}");
1252 }
1253 }),
1254 )
1255 }),
1256 );
1257
1258 return Ok(match rx {
1259 Receiver::P2(rx) => rx
1260 .await
1261 .context("guest never invoked `response-outparam::set` method")?
1262 .map_err(|e| wasmtime::Error::from(e))?
1263 .map(|body| body.map_err(|e| e.into()).boxed_unsync()),
1264 Receiver::P3(rx) => rx.await?,
1265 });
1266
1267 struct P3BodyWrapper<B> {
1270 body: B,
1271 _tx: oneshot::Sender<()>,
1272 }
1273
1274 impl<B: Body + Unpin> Body for P3BodyWrapper<B> {
1275 type Data = B::Data;
1276 type Error = B::Error;
1277
1278 fn poll_frame(
1279 mut self: Pin<&mut Self>,
1280 cx: &mut Context<'_>,
1281 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1282 Pin::new(&mut self.body).poll_frame(cx)
1283 }
1284
1285 fn is_end_stream(&self) -> bool {
1286 self.body.is_end_stream()
1287 }
1288
1289 fn size_hint(&self) -> SizeHint {
1290 self.body.size_hint()
1291 }
1292 }
1293}
1294
1295#[derive(Clone)]
1296enum Output {
1297 Stdout,
1298 Stderr,
1299}
1300
1301impl Output {
1302 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
1303 use std::io::Write;
1304
1305 match self {
1306 Output::Stdout => std::io::stdout().write_all(buf),
1307 Output::Stderr => std::io::stderr().write_all(buf),
1308 }
1309 }
1310}
1311
1312#[derive(Clone)]
1313struct LogStream {
1314 output: Output,
1315 state: Arc<LogStreamState>,
1316}
1317
1318struct LogStreamState {
1319 prefix: String,
1320 needs_prefix_on_next_write: AtomicBool,
1321}
1322
1323impl LogStream {
1324 fn new(prefix: String, output: Output) -> LogStream {
1325 LogStream {
1326 output,
1327 state: Arc::new(LogStreamState {
1328 prefix,
1329 needs_prefix_on_next_write: AtomicBool::new(true),
1330 }),
1331 }
1332 }
1333
1334 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1335 while !bytes.is_empty() {
1336 if self
1337 .state
1338 .needs_prefix_on_next_write
1339 .load(Ordering::Relaxed)
1340 {
1341 self.output.write_all(self.state.prefix.as_bytes())?;
1342 self.state
1343 .needs_prefix_on_next_write
1344 .store(false, Ordering::Relaxed);
1345 }
1346 match bytes.iter().position(|b| *b == b'\n') {
1347 Some(i) => {
1348 let (a, b) = bytes.split_at(i + 1);
1349 bytes = b;
1350 self.output.write_all(a)?;
1351 self.state
1352 .needs_prefix_on_next_write
1353 .store(true, Ordering::Relaxed);
1354 }
1355 None => {
1356 self.output.write_all(bytes)?;
1357 break;
1358 }
1359 }
1360 }
1361
1362 Ok(())
1363 }
1364}
1365
1366impl wasmtime_wasi::cli::StdoutStream for LogStream {
1367 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1368 Box::new(self.clone())
1369 }
1370 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1371 Box::new(self.clone())
1372 }
1373}
1374
1375impl wasmtime_wasi::cli::IsTerminal for LogStream {
1376 fn is_terminal(&self) -> bool {
1377 match &self.output {
1378 Output::Stdout => std::io::stdout().is_terminal(),
1379 Output::Stderr => std::io::stderr().is_terminal(),
1380 }
1381 }
1382}
1383
1384impl wasmtime_wasi::p2::OutputStream for LogStream {
1385 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1386 self.write_all(&bytes)
1387 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1388 Ok(())
1389 }
1390
1391 fn flush(&mut self) -> StreamResult<()> {
1392 Ok(())
1393 }
1394
1395 fn check_write(&mut self) -> StreamResult<usize> {
1396 Ok(1024 * 1024)
1397 }
1398}
1399
1400#[async_trait::async_trait]
1401impl wasmtime_wasi::p2::Pollable for LogStream {
1402 async fn ready(&mut self) {}
1403}
1404
1405impl AsyncWrite for LogStream {
1406 fn poll_write(
1407 mut self: Pin<&mut Self>,
1408 _cx: &mut Context<'_>,
1409 buf: &[u8],
1410 ) -> Poll<io::Result<usize>> {
1411 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1412 }
1413 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1414 Poll::Ready(Ok(()))
1415 }
1416 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1417 Poll::Ready(Ok(()))
1418 }
1419}
1420
1421fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1443 use wasmtime::{Config, Memory, MemoryType};
1444 const BITS_TO_TEST: u32 = 42;
1445 let mut config = Config::new();
1446 config.wasm_memory64(true);
1447 config.memory_reservation(1 << BITS_TO_TEST);
1448 let engine = Engine::new(&config)?;
1449 let mut store = Store::new(&engine, ());
1450 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1453 if Memory::new(&mut store, ty).is_ok() {
1454 Ok(Some(true))
1455 } else {
1456 Ok(None)
1457 }
1458}