Skip to main content

wasmtime_cli/commands/
serve.rs

1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use http::{HeaderMap, HeaderName, HeaderValue, Response, StatusCode};
5use http_body_util::combinators::UnsyncBoxBody;
6use http_body_util::{BodyExt as _, Full};
7use hyper::server::conn::http1;
8use pin_project_lite::pin_project;
9use std::convert::Infallible;
10use std::ffi::OsString;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::{
15    path::PathBuf,
16    sync::{
17        Arc, Mutex,
18        atomic::{AtomicBool, AtomicU64, Ordering},
19    },
20    time::{Duration, Instant},
21};
22use tokio::io::{self, AsyncWrite};
23use tokio::sync::{Notify, Semaphore};
24use wasmtime::component::{Component, GuestTaskId, Linker};
25use wasmtime::error::Context as _;
26use wasmtime::{
27    AsContextMut as _, Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail,
28};
29use wasmtime_cli_flags::opt::WasmtimeOptionValue;
30use wasmtime_wasi::p2::{StreamError, StreamResult};
31use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
32use wasmtime_wasi_http::WasiHttpCtx;
33use wasmtime_wasi_http::handler::{
34    self, HandlerState, Instance, Prepared, Proxy, ProxyHandler, ProxyPre, ShouldAccept, ViewFn,
35    WorkerExpiration, WorkerState, WorkerStatus,
36};
37use wasmtime_wasi_http::io::TokioIo;
38
39#[cfg(feature = "debug")]
40use crate::commands::run::RunCommand;
41
42#[cfg(feature = "wasi-config")]
43use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
44#[cfg(feature = "wasi-keyvalue")]
45use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
46#[cfg(feature = "wasi-nn")]
47use wasmtime_wasi_nn::wit::WasiNnCtx;
48
49const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
50const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
51const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
52
53struct Host {
54    table: wasmtime::component::ResourceTable,
55    ctx: WasiCtx,
56    http: WasiHttpCtx,
57    hooks: HttpHooks,
58
59    limits: StoreLimits,
60
61    #[cfg(feature = "wasi-nn")]
62    nn: Option<WasiNnCtx>,
63
64    #[cfg(feature = "wasi-config")]
65    wasi_config: Option<WasiConfigVariables>,
66
67    #[cfg(feature = "wasi-keyvalue")]
68    wasi_keyvalue: Option<WasiKeyValueCtx>,
69
70    #[cfg(feature = "profiling")]
71    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
72
73    write_profile: Option<WriteProfile>,
74}
75
76impl WasiView for Host {
77    fn ctx(&mut self) -> WasiCtxView<'_> {
78        WasiCtxView {
79            ctx: &mut self.ctx,
80            table: &mut self.table,
81        }
82    }
83}
84
85impl wasmtime_wasi_http::p2::WasiHttpView for Host {
86    fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
87        wasmtime_wasi_http::p2::WasiHttpCtxView {
88            ctx: &mut self.http,
89            table: &mut self.table,
90            hooks: &mut self.hooks,
91        }
92    }
93}
94
95#[cfg(feature = "component-model-async")]
96impl wasmtime_wasi_http::p3::WasiHttpView for Host {
97    fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
98        wasmtime_wasi_http::p3::WasiHttpCtxView {
99            table: &mut self.table,
100            ctx: &mut self.http,
101            hooks: &mut self.hooks,
102        }
103    }
104}
105
106const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
107    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
108    8080,
109);
110
111fn parse_duration(s: &str) -> Result<Duration, String> {
112    Duration::parse(Some(s)).map_err(|e| e.to_string())
113}
114
115/// Runs a WebAssembly module
116#[derive(Parser)]
117pub struct ServeCommand {
118    #[command(flatten)]
119    run: RunCommon,
120
121    /// Socket address for the web server to bind to.
122    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
123    addr: SocketAddr,
124
125    /// Socket address where, when connected to, will initiate a graceful
126    /// shutdown.
127    ///
128    /// Note that graceful shutdown is also supported on ctrl-c.
129    #[arg(long, value_name = "SOCKADDR")]
130    shutdown_addr: Option<SocketAddr>,
131
132    /// Disable log prefixes of wasi-http handlers.
133    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
134    #[arg(long)]
135    no_logging_prefix: bool,
136
137    /// The WebAssembly component to run.
138    #[arg(value_name = "WASM", required = true)]
139    component: PathBuf,
140
141    /// Maximum number of requests to send to a single component instance before
142    /// dropping it.
143    ///
144    /// This defaults to 1 for WASIp2 components and 128 for WASIp3 components.
145    #[arg(long)]
146    max_instance_reuse_count: Option<usize>,
147
148    /// Maximum number of concurrent requests to send to a single component
149    /// instance.
150    ///
151    /// This defaults to 1 for WASIp2 components and 16 for WASIp3 components.
152    /// Note that setting it to more than 1 will have no effect for WASIp2
153    /// components since they cannot be called concurrently.
154    #[arg(long)]
155    max_instance_concurrent_reuse_count: Option<usize>,
156
157    /// Time to hold an idle component instance for possible reuse before
158    /// dropping it.
159    ///
160    /// A number with no suffix or with an `s` suffix is interpreted as seconds;
161    /// other accepted suffixes include `ms` (milliseconds), `us` or `μs`
162    /// (microseconds), and `ns` (nanoseconds).
163    #[arg(long, default_value = "1s", value_parser = parse_duration)]
164    idle_instance_timeout: Duration,
165
166    /// Replace or add a request header before forwarding it to the component.
167    ///
168    /// The argument must have the form `name: value`. May be specified more
169    /// than once. An argument beginning with `@` is treated as a file containing
170    /// one header per line.
171    #[arg(short = 'H', long = "header", value_name = "HEADER")]
172    headers: Vec<String>,
173
174    /// Maximum number of concurrent requests that can be processed at any one
175    /// point in time.
176    #[arg(long)]
177    max_concurrent_requests: Option<usize>,
178    /// Maximum number of concurrent connections that can be held at any one
179    /// point in time.
180    #[arg(long)]
181    max_concurrent_connections: Option<usize>,
182}
183
184impl ServeCommand {
185    /// Start a server to run the given wasi-http proxy component
186    pub fn execute(mut self) -> Result<()> {
187        self.run.common.init_logging()?;
188
189        // We force cli errors before starting to listen for connections so then
190        // we don't accidentally delay them to the first request.
191
192        if self.run.common.wasi.nn == Some(true) {
193            #[cfg(not(feature = "wasi-nn"))]
194            {
195                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
196            }
197        }
198
199        if self.run.common.wasi.threads == Some(true) {
200            bail!("wasi-threads does not support components yet")
201        }
202
203        // The serve command requires both wasi-http and the component model, so
204        // we enable those by default here.
205        if self.run.common.wasi.http.replace(true) == Some(false) {
206            bail!("wasi-http is required for the serve command, and must not be disabled");
207        }
208        if self.run.common.wasm.component_model.replace(true) == Some(false) {
209            bail!("components are required for the serve command, and must not be disabled");
210        }
211
212        let runtime = tokio::runtime::Builder::new_multi_thread()
213            .enable_time()
214            .enable_io()
215            .build()?;
216
217        runtime.block_on(self.serve())?;
218
219        Ok(())
220    }
221
222    /// Set up the debugger component side-car, mirroring
223    /// [`RunCommand::debugger_run`].
224    #[cfg(feature = "debug")]
225    fn debugger_setup(&mut self) -> Result<Option<RunCommand>> {
226        fn set_implicit_option(
227            place: &str,
228            name: &str,
229            setting: &mut Option<bool>,
230            value: bool,
231        ) -> Result<()> {
232            if *setting == Some(!value) {
233                bail!(
234                    "Explicitly-set option on {place} {name}={} is not compatible \
235                     with debugging-implied setting {value}",
236                    setting.unwrap()
237                );
238            }
239            *setting = Some(value);
240            Ok(())
241        }
242
243        #[cfg(feature = "gdbstub")]
244        let override_bytes = if let Some(addr) = self.run.gdbstub.as_deref() {
245            if self.run.common.debug.debugger.is_some() {
246                bail!("-g/--gdb cannot be combined with -Ddebugger=");
247            }
248            let addr = if addr.parse::<u16>().is_ok() {
249                format!("127.0.0.1:{addr}")
250            } else {
251                use std::net::SocketAddr as SA;
252                addr.parse::<SA>()
253                    .with_context(|| format!("invalid gdbstub address: `{addr}`"))?;
254                addr.to_string()
255            };
256            self.run.common.debug.debugger = Some("<built-in gdbstub>".into());
257            self.run.common.debug.arg.push(addr);
258            Some(gdbstub_component_artifact::GDBSTUB_COMPONENT)
259        } else {
260            None
261        };
262        #[cfg(not(feature = "gdbstub"))]
263        let override_bytes = None;
264
265        if let Some(debugger_component_path) = self.run.common.debug.debugger.as_ref() {
266            set_implicit_option(
267                "debuggee",
268                "guest_debug",
269                &mut self.run.common.debug.guest_debug,
270                true,
271            )?;
272            set_implicit_option(
273                "debuggee",
274                "epoch_interruption",
275                &mut self.run.common.wasm.epoch_interruption,
276                true,
277            )?;
278
279            let mut debugger_run = RunCommand::try_parse_from(
280                ["run".into(), debugger_component_path.into()]
281                    .into_iter()
282                    .chain(self.run.common.debug.arg.iter().map(OsString::from)),
283            )?;
284            debugger_run.module_bytes = override_bytes;
285
286            debugger_run.run.common.wasi.tcp.get_or_insert(true);
287            debugger_run
288                .run
289                .common
290                .wasi
291                .inherit_network
292                .get_or_insert(true);
293
294            set_implicit_option(
295                "debugger",
296                "inherit_stdin",
297                &mut debugger_run.run.common.wasi.inherit_stdin,
298                self.run.common.debug.inherit_stdin.unwrap_or(false),
299            )?;
300            set_implicit_option(
301                "debugger",
302                "inherit_stdout",
303                &mut debugger_run.run.common.wasi.inherit_stdout,
304                self.run.common.debug.inherit_stdout.unwrap_or(false),
305            )?;
306            set_implicit_option(
307                "debugger",
308                "inherit_stderr",
309                &mut debugger_run.run.common.wasi.inherit_stderr,
310                self.run.common.debug.inherit_stderr.unwrap_or(false),
311            )?;
312            Ok(Some(debugger_run))
313        } else {
314            Ok(None)
315        }
316    }
317
318    /// Run the HTTP server under a debugger component.
319    ///
320    /// Uses a single store and instance to handle all requests
321    /// sequentially, so the debugger can pause and inspect state.
322    #[cfg(feature = "debug")]
323    async fn serve_under_debugger(
324        self,
325        mut debug_run: RunCommand,
326        linker: Linker<Host>,
327        component: Component,
328    ) -> Result<()> {
329        let mut debuggee_store = self.new_store(linker.engine(), None)?;
330
331        // Pre-register component modules so the debugger can see
332        // them and set breakpoints at the initial stop.
333        debuggee_store.debug_register_component(&component)?;
334
335        let debug_engine = debug_run.new_engine()?;
336        let debug_main = debug_run.run.load_module(
337            &debug_engine,
338            debug_run.module_and_args[0].as_ref(),
339            debug_run.module_bytes.as_ref().map(|v| &v[..]),
340        )?;
341        let (mut debug_store, debug_linker) =
342            debug_run.new_store_and_linker(&debug_engine, &debug_main)?;
343        let debug_component = match debug_main {
344            RunTarget::Core(_) => {
345                bail!("Debugger component is a core module; only components are supported")
346            }
347            RunTarget::Component(c) => c,
348        };
349        let mut debug_linker = match debug_linker {
350            crate::commands::run::CliLinker::Core(_) => unreachable!(),
351            crate::commands::run::CliLinker::Component(l) => l,
352        };
353        debug_run.add_debugger_api(&mut debug_linker)?;
354
355        debug_run
356            .invoke_debugger(
357                &mut debug_store,
358                &debug_component,
359                &mut debug_linker,
360                debuggee_store,
361                move |store| Box::pin(self.serve_maybe_debug(linker, component, Some(store))),
362            )
363            .await
364    }
365
366    fn new_store(&self, engine: &Engine, instance_id: Option<u64>) -> Result<Store<Host>> {
367        let mut builder = WasiCtxBuilder::new();
368        self.run.configure_wasip2(&mut builder)?;
369
370        if let Some(instance_id) = instance_id {
371            builder.env("INSTANCE_ID", instance_id.to_string());
372        }
373
374        let stdout_prefix: String;
375        let stderr_prefix: String;
376        match instance_id {
377            Some(instance_id) if !self.no_logging_prefix => {
378                stdout_prefix = format!("stdout [{instance_id}] :: ");
379                stderr_prefix = format!("stderr [{instance_id}] :: ");
380            }
381            _ => {
382                stdout_prefix = "".to_string();
383                stderr_prefix = "".to_string();
384            }
385        }
386        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
387        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
388
389        let mut table = wasmtime::component::ResourceTable::new();
390        if let Some(max) = self.run.common.wasi.max_resources {
391            table.set_max_capacity(max);
392        }
393        let mut host = Host {
394            table,
395            ctx: builder.build(),
396            http: self.run.wasi_http_ctx()?,
397            hooks: self.run.wasi_http_hooks(),
398
399            limits: StoreLimits::default(),
400
401            #[cfg(feature = "wasi-nn")]
402            nn: None,
403            #[cfg(feature = "wasi-config")]
404            wasi_config: None,
405            #[cfg(feature = "wasi-keyvalue")]
406            wasi_keyvalue: None,
407            #[cfg(feature = "profiling")]
408            guest_profiler: None,
409            write_profile: None,
410        };
411
412        if self.run.common.wasi.nn == Some(true) {
413            #[cfg(feature = "wasi-nn")]
414            {
415                let graphs = self
416                    .run
417                    .common
418                    .wasi
419                    .nn_graph
420                    .iter()
421                    .map(|g| (g.format.clone(), g.dir.clone()))
422                    .collect::<Vec<_>>();
423                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
424                host.nn.replace(WasiNnCtx::new(backends, registry));
425            }
426        }
427
428        if self.run.common.wasi.config == Some(true) {
429            #[cfg(feature = "wasi-config")]
430            {
431                let vars = WasiConfigVariables::from_iter(
432                    self.run
433                        .common
434                        .wasi
435                        .config_var
436                        .iter()
437                        .map(|v| (v.key.clone(), v.value.clone())),
438                );
439                host.wasi_config.replace(vars);
440            }
441        }
442
443        if self.run.common.wasi.keyvalue == Some(true) {
444            #[cfg(feature = "wasi-keyvalue")]
445            {
446                let ctx = WasiKeyValueCtxBuilder::new()
447                    .in_memory_data(
448                        self.run
449                            .common
450                            .wasi
451                            .keyvalue_in_memory_data
452                            .iter()
453                            .map(|v| (v.key.clone(), v.value.clone())),
454                    )
455                    .build();
456                host.wasi_keyvalue.replace(ctx);
457            }
458        }
459
460        let mut store = Store::new(engine, host);
461
462        if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
463            store.set_hostcall_fuel(fuel);
464        }
465
466        store.data_mut().limits = self.run.store_limits();
467        store.limiter(|t| &mut t.limits);
468
469        // If fuel has been configured, we want to add the configured
470        // fuel amount to this store.
471        if let Some(fuel) = self.run.common.wasm.fuel {
472            store.set_fuel(fuel)?;
473        }
474
475        Ok(store)
476    }
477
478    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
479        self.run.validate_p3_option()?;
480        let cli = self.run.validate_cli_enabled()?;
481
482        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
483        // to serve as a signal to enable all WASI interfaces instead of just
484        // those in the `proxy` world. If `-Scli` is present then add all
485        // `command` APIs and then additionally add in the required HTTP APIs.
486        //
487        // If `-Scli` isn't passed then use the `add_to_linker_async`
488        // bindings which adds just those interfaces that the proxy interface
489        // uses.
490        if cli == Some(true) {
491            self.run.add_wasmtime_wasi_to_linker(linker)?;
492            wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
493            #[cfg(feature = "component-model-async")]
494            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
495                wasmtime_wasi_http::p3::add_to_linker(linker)?;
496            }
497        } else {
498            wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
499            #[cfg(feature = "component-model-async")]
500            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
501                wasmtime_wasi_http::p3::add_to_linker(linker)?;
502                wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
503                wasmtime_wasi::p3::random::add_to_linker(linker)?;
504                wasmtime_wasi::p3::cli::add_to_linker(linker)?;
505            }
506        }
507
508        if self.run.common.wasi.nn == Some(true) {
509            #[cfg(not(feature = "wasi-nn"))]
510            {
511                bail!("support for wasi-nn was disabled at compile time");
512            }
513            #[cfg(feature = "wasi-nn")]
514            {
515                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
516                    let ctx = h.nn.as_mut().unwrap();
517                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
518                })?;
519            }
520        }
521
522        if self.run.common.wasi.config == Some(true) {
523            #[cfg(not(feature = "wasi-config"))]
524            {
525                bail!("support for wasi-config was disabled at compile time");
526            }
527            #[cfg(feature = "wasi-config")]
528            {
529                wasmtime_wasi_config::add_to_linker(linker, |h| {
530                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
531                })?;
532            }
533        }
534
535        if self.run.common.wasi.keyvalue == Some(true) {
536            #[cfg(not(feature = "wasi-keyvalue"))]
537            {
538                bail!("support for wasi-keyvalue was disabled at compile time");
539            }
540            #[cfg(feature = "wasi-keyvalue")]
541            {
542                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
543                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
544                })?;
545            }
546        }
547
548        if self.run.common.wasi.threads == Some(true) {
549            bail!("support for wasi-threads is not available with components");
550        }
551
552        if self.run.common.wasi.http == Some(false) {
553            bail!("support for wasi-http must be enabled for `serve` subcommand");
554        }
555
556        Ok(())
557    }
558
559    async fn serve(mut self) -> Result<()> {
560        #[cfg(feature = "debug")]
561        let debug_run = self.debugger_setup()?;
562
563        let mut config = self
564            .run
565            .common
566            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
567        config.wasm_component_model(true);
568
569        if self.run.common.wasm.timeout.is_some() {
570            config.epoch_interruption(true);
571        }
572
573        match self.run.profile {
574            Some(Profile::Native(s)) => {
575                config.profiler(s);
576            }
577            Some(Profile::Guest { .. }) => {
578                config.epoch_interruption(true);
579            }
580            None => {}
581        }
582
583        let engine = Engine::new(&config)?;
584        let mut linker = Linker::new(&engine);
585
586        self.add_to_linker(&mut linker)?;
587
588        let component = match self.run.load_module(&engine, &self.component, None)? {
589            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
590            RunTarget::Component(c) => c,
591        };
592
593        #[cfg(feature = "debug")]
594        if let Some(debug_run) = debug_run {
595            return self
596                .serve_under_debugger(debug_run, linker, component)
597                .await;
598        }
599
600        self.serve_maybe_debug(linker, component, None).await
601    }
602
603    async fn serve_maybe_debug(
604        self,
605        linker: Linker<Host>,
606        component: Component,
607        mut debuggee_store: Option<&mut Store<Host>>,
608    ) -> Result<()> {
609        let engine = linker.engine();
610        let request_headers = RequestHeaders::parse(&self.headers)?;
611        let instance = linker.instantiate_pre(&component)?;
612        #[cfg(feature = "component-model-async")]
613        let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
614            Ok(pre) => ProxyPre::P3(pre),
615            Err(_) => ProxyPre::P2(wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance)?),
616        };
617        #[cfg(not(feature = "component-model-async"))]
618        let instance = ProxyPre::P2(wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance)?);
619
620        // Spawn background task(s) waiting for graceful shutdown signals. This
621        // always listens for ctrl-c but additionally can listen for a TCP
622        // connection to the specified address.
623        let shutdown = Arc::new(GracefulShutdown::default());
624        tokio::task::spawn({
625            let shutdown = shutdown.clone();
626            async move {
627                tokio::signal::ctrl_c().await.unwrap();
628                shutdown.requested.notify_one();
629            }
630        });
631        if let Some(addr) = self.shutdown_addr {
632            let listener = tokio::net::TcpListener::bind(addr).await?;
633            eprintln!(
634                "Listening for shutdown on tcp://{}/",
635                listener.local_addr()?
636            );
637            let shutdown = shutdown.clone();
638            tokio::task::spawn(async move {
639                let _ = listener.accept().await;
640                shutdown.requested.notify_one();
641            });
642        }
643
644        let socket = match &self.addr {
645            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
646            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
647        };
648        // Conditionally enable `SO_REUSEADDR` depending on the current
649        // platform. On Unix we want this to be able to rebind an address in
650        // the `TIME_WAIT` state which can happen then a server is killed with
651        // active TCP connections and then restarted. On Windows though if
652        // `SO_REUSEADDR` is specified then it enables multiple applications to
653        // bind the port at the same time which is not something we want. Hence
654        // this is conditionally set based on the platform (and deviates from
655        // Tokio's default from always-on).
656        socket.set_reuseaddr(!cfg!(windows))?;
657        socket.bind(self.addr)?;
658        let listener = socket.listen(100)?;
659
660        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
661
662        log::info!("Listening on {}", self.addr);
663
664        let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
665            Some(interval)
666        } else if let Some(t) = self.run.common.wasm.timeout {
667            Some(EPOCH_INTERRUPT_PERIOD.min(t))
668        } else if debuggee_store.is_some() {
669            Some(Duration::from_millis(1))
670        } else {
671            None
672        };
673        let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
674
675        let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
676            if let ProxyPre::P3(_) = &instance {
677                DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
678            } else {
679                DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
680            }
681        });
682
683        let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
684            self.max_instance_concurrent_reuse_count
685                .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
686        } else {
687            1
688        };
689
690        let max_concurrent_connections = self
691            .max_concurrent_connections
692            .unwrap_or(if debuggee_store.is_some() { 1 } else { 1000 });
693        let max_concurrent_requests = self
694            .max_concurrent_requests
695            .unwrap_or(if debuggee_store.is_some() { 1 } else { 1000 });
696        if debuggee_store.is_some() && max_concurrent_connections != 1 {
697            bail!("cannot have more than 1 max concurrent connections with a debugger");
698        }
699        if debuggee_store.is_some() && max_concurrent_requests != 1 {
700            bail!("cannot have more than 1 max concurrent requests with a debugger");
701        }
702
703        let sem_connections = Arc::new(Semaphore::new(max_concurrent_connections));
704
705        let handler = ProxyHandler::new(HostHandlerState {
706            sem_requests: Semaphore::new(max_concurrent_requests),
707            cmd: self,
708            component,
709            request_headers,
710            max_instance_reuse_count,
711            max_instance_concurrent_reuse_count,
712            instance,
713            next_instance_id: AtomicU64::default(),
714            next_request_id: AtomicU64::default(),
715            // Give one shutdown guard to this handler which will track the
716            // full lifetime of any instances spawned.
717            _shutdown_guard: Box::new(shutdown.clone().increment()),
718        });
719
720        loop {
721            // Wait for a socket, but also "race" against shutdown to break out
722            // of this loop. Once the graceful shutdown signal is received then
723            // this loop exits immediately.
724            let (connection_permit, stream) = tokio::select! {
725                _ = shutdown.requested.notified() => break,
726                v = async {
727                    let permit = sem_connections.clone().acquire_owned().await?;
728                    let (stream, _) = listener.accept().await?;
729                    wasmtime::error::Ok((permit, stream))
730                } => v?,
731            };
732
733            // The Nagle algorithm can impose a significant latency penalty
734            // (e.g. 40ms on Linux) on guests which write small, intermittent
735            // response body chunks (e.g. SSE streams).  Here we disable that
736            // algorithm and rely on the guest to buffer if appropriate to avoid
737            // TCP fragmentation.
738            stream.set_nodelay(true)?;
739
740            // In addition to the shutdown guard given to the handler above,
741            // also give one to the tokio tasks doing HTTP I/O as well to ensure
742            // it keeps them alive too.
743            let shutdown_guard = shutdown.clone().increment();
744
745            // When debugging, handle the client synchronously since
746            // concurrent requests can't be served. Otherwise though spawn a
747            // task to handle this client.
748            match &mut debuggee_store {
749                Some(store) => {
750                    handle_client(stream, &handler, Some(store)).await;
751                }
752                None => {
753                    let handler = handler.clone();
754                    tokio::task::spawn(async move {
755                        handle_client(stream, &handler, None).await;
756                        drop(shutdown_guard);
757                        drop(connection_permit);
758                    });
759                }
760            }
761        }
762
763        // Don't allow any further requests to get picked up.
764        handler.state().sem_requests.close();
765
766        drop(handler);
767
768        // Upon exiting the loop we'll no longer process any more incoming
769        // connections but there may still be outstanding connections
770        // processing in child tasks. If there are wait for those to complete
771        // before shutting down completely. Also enable short-circuiting this
772        // wait with a second ctrl-c signal.
773        if shutdown.close() {
774            return Ok(());
775        }
776        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
777        tokio::select! {
778            _ = tokio::signal::ctrl_c() => {}
779            _ = shutdown.complete.notified() => {}
780        }
781
782        Ok(())
783    }
784}
785
786pin_project! {
787    struct HostWorkerExpiration {
788        idle_timeout: Duration,
789        request_timeout: Duration,
790        #[pin]
791        sleep: tokio::time::Sleep,
792    }
793}
794
795impl WorkerExpiration for HostWorkerExpiration {
796    fn poll(
797        self: Pin<&mut Self>,
798        cx: &mut Context<'_>,
799        status: WorkerStatus,
800        start: Instant,
801    ) -> Poll<()> {
802        let mut me = self.project();
803
804        let timeout = match status {
805            WorkerStatus::Idle => *me.idle_timeout,
806            // TODO: add a dedicated `post_return_timeout` config setting
807            // instead of reusing `request_timeout` for
808            // `WorkerStatus::PostReturn` here
809            WorkerStatus::Requests | WorkerStatus::PostReturn => *me.request_timeout,
810        };
811
812        if let Some(deadline) = start.checked_add(timeout) {
813            let deadline = deadline.into();
814            if deadline != me.sleep.deadline() {
815                me.sleep.as_mut().reset(deadline);
816            }
817            me.sleep.poll(cx)
818        } else {
819            Poll::Pending
820        }
821    }
822}
823
824struct HostWorkerState {
825    instance_id: u64,
826    max_instance_reuse_count: usize,
827    max_instance_concurrent_reuse_count: usize,
828    request_timeout: Duration,
829}
830
831impl WorkerState for HostWorkerState {
832    type StoreData = Host;
833    type RequestId = u64;
834
835    fn should_accept_request(&self, concurrent_count: usize, total_count: usize) -> ShouldAccept {
836        if total_count >= self.max_instance_reuse_count {
837            ShouldAccept::Never
838        } else if concurrent_count >= self.max_instance_concurrent_reuse_count {
839            ShouldAccept::No
840        } else {
841            ShouldAccept::Yes
842        }
843    }
844
845    fn on_request_start(
846        &self,
847        _store: StoreContextMut<Host>,
848        request_id: u64,
849        _task_id: GuestTaskId,
850    ) -> Pin<Box<dyn Future<Output = ()> + 'static + Send + Sync>> {
851        log::info!(
852            "Instance {} handling request {request_id}",
853            self.instance_id,
854        );
855
856        Box::pin(tokio::time::sleep(self.request_timeout))
857    }
858
859    fn drop(&self, mut store: Store<Self::StoreData>, result: Result<(), wasmtime::Error>) {
860        if let Err(error) = result {
861            eprintln!("worker failed: {error:?}");
862        }
863
864        if let Some(write_profile) = store.data_mut().write_profile.take() {
865            write_profile(store.as_context_mut());
866        }
867
868        drop(store);
869    }
870}
871
872struct HostHandlerState {
873    cmd: ServeCommand,
874    component: Component,
875    request_headers: RequestHeaders,
876    max_instance_reuse_count: usize,
877    max_instance_concurrent_reuse_count: usize,
878    instance: ProxyPre<Host>,
879    next_instance_id: AtomicU64,
880    next_request_id: AtomicU64,
881    sem_requests: Semaphore,
882    _shutdown_guard: Box<dyn std::any::Any + Send + Sync>,
883}
884
885impl HostHandlerState {
886    async fn instantiate_into(&self, store: &mut Store<Host>) -> Result<Proxy> {
887        let write_profile = setup_epoch_handler(&self.cmd, &mut *store, self.component.clone())?;
888        store.data_mut().write_profile = Some(write_profile);
889        self.instance.instantiate_async(&mut *store).await
890    }
891
892    fn view(&self) -> ViewFn<Host> {
893        match &self.instance {
894            ProxyPre::P2(_) => ViewFn::P2(wasmtime_wasi_http::p2::WasiHttpView::http),
895            ProxyPre::P3(_) => ViewFn::P3(wasmtime_wasi_http::p3::WasiHttpView::http),
896        }
897    }
898}
899
900impl HandlerState for HostHandlerState {
901    type StoreData = Host;
902    type WorkerExpiration = HostWorkerExpiration;
903    type WorkerState = HostWorkerState;
904
905    async fn instantiate(
906        &self,
907    ) -> Result<Instance<Self::StoreData, Self::WorkerExpiration, Self::WorkerState>> {
908        let instance_id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
909        let mut store = self
910            .cmd
911            .new_store(self.component.engine(), Some(instance_id))?;
912        let proxy = self.instantiate_into(&mut store).await?;
913
914        Ok(Instance {
915            store,
916            proxy,
917            view: self.view(),
918            expiration: HostWorkerExpiration {
919                idle_timeout: self.cmd.idle_instance_timeout,
920                request_timeout: self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX),
921                sleep: tokio::time::sleep(Duration::MAX),
922            },
923            state: HostWorkerState {
924                max_instance_reuse_count: self.max_instance_reuse_count,
925                max_instance_concurrent_reuse_count: self.max_instance_concurrent_reuse_count,
926                instance_id,
927                request_timeout: self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX),
928            },
929        })
930    }
931}
932
933/// Helper structure to manage graceful shutdown int he accept loop above.
934#[derive(Default)]
935struct GracefulShutdown {
936    /// Async notification that shutdown has been requested.
937    requested: Notify,
938    /// Async notification that shutdown has completed, signaled when
939    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
940    complete: Notify,
941    /// Internal state related to what's in progress when shutdown is requested.
942    state: Mutex<GracefulShutdownState>,
943}
944
945#[derive(Default)]
946struct GracefulShutdownState {
947    active_tasks: u32,
948    notify_when_done: bool,
949}
950
951impl GracefulShutdown {
952    /// Increments the number of active tasks and returns a guard indicating
953    fn increment(self: Arc<Self>) -> impl Drop + Send + Sync {
954        struct Guard(Arc<GracefulShutdown>);
955
956        let mut state = self.state.lock().unwrap();
957        assert!(!state.notify_when_done);
958        state.active_tasks += 1;
959        drop(state);
960
961        return Guard(self);
962
963        impl Drop for Guard {
964            fn drop(&mut self) {
965                let mut state = self.0.state.lock().unwrap();
966                state.active_tasks -= 1;
967                if state.notify_when_done && state.active_tasks == 0 {
968                    self.0.complete.notify_one();
969                }
970            }
971        }
972    }
973
974    /// Flags this state as done spawning tasks and returns whether there are no
975    /// more child tasks remaining.
976    fn close(&self) -> bool {
977        let mut state = self.state.lock().unwrap();
978        state.notify_when_done = true;
979        state.active_tasks == 0
980    }
981}
982
983/// When executing with a timeout enabled, this is how frequently epoch
984/// interrupts will be executed to check for timeouts. If guest profiling
985/// is enabled, the guest epoch period will be used.
986const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
987
988struct EpochThread {
989    shutdown: Arc<AtomicBool>,
990    handle: Option<std::thread::JoinHandle<()>>,
991}
992
993impl EpochThread {
994    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
995        let shutdown = Arc::new(AtomicBool::new(false));
996        let handle = {
997            let shutdown = Arc::clone(&shutdown);
998            let handle = std::thread::spawn(move || {
999                while !shutdown.load(Ordering::Relaxed) {
1000                    std::thread::sleep(interval);
1001                    engine.increment_epoch();
1002                }
1003            });
1004            Some(handle)
1005        };
1006
1007        EpochThread { shutdown, handle }
1008    }
1009}
1010
1011impl Drop for EpochThread {
1012    fn drop(&mut self) {
1013        if let Some(handle) = self.handle.take() {
1014            self.shutdown.store(true, Ordering::Relaxed);
1015            handle.join().unwrap();
1016        }
1017    }
1018}
1019
1020type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
1021
1022fn setup_epoch_handler(
1023    cmd: &ServeCommand,
1024    store: &mut Store<Host>,
1025    component: Component,
1026) -> Result<WriteProfile> {
1027    // Profiling Enabled
1028    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
1029        #[cfg(feature = "profiling")]
1030        return setup_guest_profiler(store, path.clone(), *interval, component.clone());
1031        #[cfg(not(feature = "profiling"))]
1032        {
1033            let _ = (path, interval);
1034            bail!("support for profiling disabled at compile time!");
1035        }
1036    }
1037
1038    // Profiling disabled but there's a global request timeout
1039    if cmd.run.common.wasm.timeout.is_some() || cmd.run.common.debug.debugger.is_some() {
1040        store.epoch_deadline_async_yield_and_update(1);
1041    }
1042
1043    Ok(Box::new(|_store| {}))
1044}
1045
1046#[cfg(feature = "profiling")]
1047fn setup_guest_profiler(
1048    store: &mut Store<Host>,
1049    path: String,
1050    interval: Duration,
1051    component: Component,
1052) -> Result<WriteProfile> {
1053    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
1054
1055    let module_name = "<main>";
1056
1057    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
1058        store.engine(),
1059        module_name,
1060        interval,
1061        component,
1062        std::iter::empty(),
1063    )?));
1064
1065    fn sample(
1066        mut store: StoreContextMut<Host>,
1067        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
1068    ) {
1069        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
1070        f(
1071            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
1072            store.as_context(),
1073        );
1074        store.data_mut().guest_profiler = Some(profiler);
1075    }
1076
1077    // Hostcall entry/exit, etc.
1078    store.call_hook(|store, kind| {
1079        sample(store, |profiler, store| profiler.call_hook(store, kind));
1080        Ok(())
1081    });
1082
1083    store.epoch_deadline_callback(move |store| {
1084        sample(store, |profiler, store| {
1085            profiler.sample(store, std::time::Duration::ZERO)
1086        });
1087
1088        Ok(UpdateDeadline::Continue(1))
1089    });
1090
1091    store.set_epoch_deadline(1);
1092
1093    let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
1094        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
1095            .expect("profiling doesn't support threads yet");
1096        if let Err(e) = std::fs::File::create(&path)
1097            .map_err(wasmtime::Error::new)
1098            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
1099        {
1100            eprintln!("failed writing profile at {path}: {e:#}");
1101        } else {
1102            eprintln!();
1103            eprintln!("Profile written to: {path}");
1104            eprintln!("View this profile at https://profiler.firefox.com/.");
1105        }
1106    });
1107
1108    Ok(write_profile)
1109}
1110
1111type Request = hyper::Request<hyper::body::Incoming>;
1112
1113async fn handle_client(
1114    client: tokio::net::TcpStream,
1115    handler: &ProxyHandler<HostHandlerState>,
1116    debuggee_store: Option<&mut Store<Host>>,
1117) {
1118    // Hyper's `service_fn` takes an `Fn` closure, so to bridge the need to
1119    // transfer a mutable store to each request for debugging a tokio mutex is
1120    // used. The tokio mutex is required as the returned future must also be
1121    // `Send`.
1122    let lock = &debuggee_store.map(tokio::sync::Mutex::new);
1123
1124    if let Err(e) = http1::Builder::new()
1125        .keep_alive(true)
1126        .serve_connection(
1127            TokioIo::new(client),
1128            hyper::service::service_fn(move |req| async move {
1129                let mut debuggee_store = match &lock {
1130                    Some(store) => Some(store.lock().await),
1131                    None => None,
1132                };
1133                let debuggee_store = debuggee_store.as_mut().map(|s| &mut ***s);
1134                match handle_request(handler, debuggee_store, req).await {
1135                    Ok(r) => Ok::<_, Infallible>(r),
1136                    Err(e) => {
1137                        eprintln!("error: {e:?}");
1138                        let error_html = "\
1139<!doctype html>
1140<html>
1141<head>
1142    <title>500 Internal Server Error</title>
1143</head>
1144<body>
1145    <center>
1146        <h1>500 Internal Server Error</h1>
1147        <hr>
1148        wasmtime
1149    </center>
1150</body>
1151</html>";
1152                        Ok(Response::builder()
1153                            .status(StatusCode::INTERNAL_SERVER_ERROR)
1154                            .header("Content-Type", "text/html; charset=UTF-8")
1155                            .body(
1156                                Full::new(bytes::Bytes::from(error_html))
1157                                    .map_err(|_| unreachable!())
1158                                    .boxed_unsync(),
1159                            )
1160                            .unwrap())
1161                    }
1162                }
1163            }),
1164        )
1165        .await
1166    {
1167        eprintln!("error: {e:?}");
1168    }
1169}
1170
1171async fn handle_request(
1172    handler: &ProxyHandler<HostHandlerState>,
1173    debuggee_store: Option<&mut Store<Host>>,
1174    mut req: Request,
1175) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
1176    use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode;
1177
1178    // This is used to throttle the maximum number of concurrent requests that
1179    // can be processed at any one point in time before delegating to
1180    // `handler.handle(...)` below.
1181    let _request_permit = handler.state().sem_requests.acquire().await?;
1182
1183    handler.state().request_headers.apply(req.headers_mut());
1184
1185    let request_id = handler
1186        .state()
1187        .next_request_id
1188        .fetch_add(1, Ordering::Relaxed);
1189    log::info!(
1190        "Received request {request_id}: {} {}",
1191        req.method(),
1192        req.uri()
1193    );
1194
1195    let req = req.map(|body| {
1196        body.map_err(ErrorCode::from_hyper_request_error)
1197            .map_err(handler::ErrorCode::from)
1198            .boxed_unsync()
1199    });
1200
1201    match debuggee_store {
1202        // For debugging go ahead and synchronously execute the instance here
1203        // in a single instance. This is debugging-specific to use the store
1204        // passed in.
1205        Some(store) => {
1206            let instance = handler.state().instantiate_into(store).await?;
1207            let (tx, rx) = futures::channel::oneshot::channel();
1208            let prepared = Prepared::new(
1209                store.as_context_mut(),
1210                &instance,
1211                req,
1212                handler.state().view(),
1213                tx,
1214            )?;
1215            store
1216                .run_concurrent(async |store| prepared.run(store, std::future::pending()).await)
1217                .await??;
1218            rx.await?
1219        }
1220
1221        // For when debugging is disabled delegate to the default handling path.
1222        None => handler.handle(request_id, req).await,
1223    }
1224}
1225
1226#[derive(Clone, Default)]
1227struct RequestHeaders {
1228    entries: Vec<(HeaderName, HeaderValue)>,
1229}
1230
1231impl RequestHeaders {
1232    fn parse(headers: &[String]) -> Result<Self> {
1233        let mut entries = Vec::new();
1234        for header in headers {
1235            if let Some(path) = header.strip_prefix('@') {
1236                let contents = std::fs::read_to_string(path)
1237                    .with_context(|| format!("failed to read header file `{path}`"))?;
1238                for line in contents.lines().filter(|line| !line.trim().is_empty()) {
1239                    entries.push(parse_header(line)?);
1240                }
1241            } else {
1242                entries.push(parse_header(header)?);
1243            }
1244        }
1245        Ok(Self { entries })
1246    }
1247
1248    fn apply(&self, headers: &mut HeaderMap) {
1249        // Remove all request-provided values before appending CLI-provided
1250        // values so repeated CLI headers with the same name are preserved.
1251        for name in self.entries.iter().map(|(name, _)| name) {
1252            headers.remove(name);
1253        }
1254        for (name, value) in &self.entries {
1255            headers.append(name, value.clone());
1256        }
1257    }
1258}
1259
1260fn parse_header(header: &str) -> Result<(HeaderName, HeaderValue)> {
1261    let (name, value) = header
1262        .split_once(':')
1263        .with_context(|| format!("header `{header}` is missing `:`"))?;
1264    let name = HeaderName::from_bytes(name.trim().as_bytes())
1265        .with_context(|| format!("invalid header name in header `{header}`"))?;
1266    let value = HeaderValue::from_str(value.trim_start())
1267        .with_context(|| format!("invalid header value in header `{header}`"))?;
1268    Ok((name, value))
1269}
1270
1271#[derive(Clone)]
1272enum Output {
1273    Stdout,
1274    Stderr,
1275}
1276
1277impl Output {
1278    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
1279        use std::io::Write;
1280
1281        match self {
1282            Output::Stdout => std::io::stdout().write_all(buf),
1283            Output::Stderr => std::io::stderr().write_all(buf),
1284        }
1285    }
1286}
1287
1288#[derive(Clone)]
1289struct LogStream {
1290    output: Output,
1291    state: Arc<LogStreamState>,
1292}
1293
1294struct LogStreamState {
1295    prefix: String,
1296    needs_prefix_on_next_write: AtomicBool,
1297}
1298
1299impl LogStream {
1300    fn new(prefix: String, output: Output) -> LogStream {
1301        LogStream {
1302            output,
1303            state: Arc::new(LogStreamState {
1304                prefix,
1305                needs_prefix_on_next_write: AtomicBool::new(true),
1306            }),
1307        }
1308    }
1309
1310    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1311        while !bytes.is_empty() {
1312            if self
1313                .state
1314                .needs_prefix_on_next_write
1315                .load(Ordering::Relaxed)
1316            {
1317                self.output.write_all(self.state.prefix.as_bytes())?;
1318                self.state
1319                    .needs_prefix_on_next_write
1320                    .store(false, Ordering::Relaxed);
1321            }
1322            match bytes.iter().position(|b| *b == b'\n') {
1323                Some(i) => {
1324                    let (a, b) = bytes.split_at(i + 1);
1325                    bytes = b;
1326                    self.output.write_all(a)?;
1327                    self.state
1328                        .needs_prefix_on_next_write
1329                        .store(true, Ordering::Relaxed);
1330                }
1331                None => {
1332                    self.output.write_all(bytes)?;
1333                    break;
1334                }
1335            }
1336        }
1337
1338        Ok(())
1339    }
1340}
1341
1342impl wasmtime_wasi::cli::StdoutStream for LogStream {
1343    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1344        Box::new(self.clone())
1345    }
1346    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1347        Box::new(self.clone())
1348    }
1349}
1350
1351impl wasmtime_wasi::cli::IsTerminal for LogStream {
1352    fn is_terminal(&self) -> bool {
1353        match &self.output {
1354            Output::Stdout => std::io::stdout().is_terminal(),
1355            Output::Stderr => std::io::stderr().is_terminal(),
1356        }
1357    }
1358}
1359
1360impl wasmtime_wasi::p2::OutputStream for LogStream {
1361    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1362        self.write_all(&bytes)
1363            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1364        Ok(())
1365    }
1366
1367    fn flush(&mut self) -> StreamResult<()> {
1368        Ok(())
1369    }
1370
1371    fn check_write(&mut self) -> StreamResult<usize> {
1372        Ok(1024 * 1024)
1373    }
1374}
1375
1376#[async_trait::async_trait]
1377impl wasmtime_wasi::p2::Pollable for LogStream {
1378    async fn ready(&mut self) {}
1379}
1380
1381impl AsyncWrite for LogStream {
1382    fn poll_write(
1383        mut self: Pin<&mut Self>,
1384        _cx: &mut Context<'_>,
1385        buf: &[u8],
1386    ) -> Poll<io::Result<usize>> {
1387        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1388    }
1389    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1390        Poll::Ready(Ok(()))
1391    }
1392    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1393        Poll::Ready(Ok(()))
1394    }
1395}
1396
1397/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1398/// try to use it when we can. The main cost of the pooling allocator, however,
1399/// is the virtual memory required to run it. Not all systems support the same
1400/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1401/// only support 39 bits of virtual address space.
1402///
1403/// The pooling allocator, by default, will request 1000 linear memories each
1404/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1405/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1406/// some systems, so there the pooling allocator will fail by default.
1407///
1408/// This function attempts to dynamically determine the hint for the pooling
1409/// allocator. This returns `Some(true)` if the pooling allocator should be used
1410/// by default, or `None` or an error otherwise.
1411///
1412/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1413/// with a maximum size that's N bits large where we force all memories to be
1414/// static. This should attempt to acquire N bits of the virtual address space.
1415/// If successful that should mean that the pooling allocator is OK to use, but
1416/// if it fails then the pooling allocator is not used and the normal mmap-based
1417/// implementation is used instead.
1418fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1419    use wasmtime::{Config, Memory, MemoryType};
1420    const BITS_TO_TEST: u32 = 42;
1421    let mut config = Config::new();
1422    config.wasm_memory64(true);
1423    config.memory_reservation(1 << BITS_TO_TEST);
1424    let engine = Engine::new(&config)?;
1425    let mut store = Store::new(&engine, ());
1426    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1427    // page size here from the maximum size.
1428    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1429    if Memory::new(&mut store, ty).is_ok() {
1430        Ok(Some(true))
1431    } else {
1432        Ok(None)
1433    }
1434}