Skip to main content

wasmtime_cli/commands/
serve.rs

1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use futures::future::FutureExt;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::UnsyncBoxBody;
8use hyper::body::{Body, Frame, SizeHint};
9use std::convert::Infallible;
10use std::ffi::OsString;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::{
15    path::PathBuf,
16    sync::{
17        Arc, Mutex,
18        atomic::{AtomicBool, Ordering},
19    },
20    time::Duration,
21};
22use tokio::io::{self, AsyncWrite};
23use tokio::sync::Notify;
24use wasmtime::component::{Component, Linker};
25use wasmtime::{
26    Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail, error::Context as _,
27};
28use wasmtime_cli_flags::opt::WasmtimeOptionValue;
29use wasmtime_wasi::p2::{StreamError, StreamResult};
30use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
31#[cfg(feature = "component-model-async")]
32use wasmtime_wasi_http::handler::p2::bindings as p2;
33use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
34use wasmtime_wasi_http::io::TokioIo;
35use wasmtime_wasi_http::{WasiHttpCtx, p2::WasiHttpView};
36
37#[cfg(feature = "debug")]
38use crate::commands::run::RunCommand;
39
40#[cfg(feature = "wasi-config")]
41use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
42#[cfg(feature = "wasi-keyvalue")]
43use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
44#[cfg(feature = "wasi-nn")]
45use wasmtime_wasi_nn::wit::WasiNnCtx;
46
47const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
48const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
49const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
50
51struct Host {
52    table: wasmtime::component::ResourceTable,
53    ctx: WasiCtx,
54    http: WasiHttpCtx,
55    hooks: HttpHooks,
56
57    limits: StoreLimits,
58
59    #[cfg(feature = "wasi-nn")]
60    nn: Option<WasiNnCtx>,
61
62    #[cfg(feature = "wasi-config")]
63    wasi_config: Option<WasiConfigVariables>,
64
65    #[cfg(feature = "wasi-keyvalue")]
66    wasi_keyvalue: Option<WasiKeyValueCtx>,
67
68    #[cfg(feature = "profiling")]
69    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
70}
71
72impl WasiView for Host {
73    fn ctx(&mut self) -> WasiCtxView<'_> {
74        WasiCtxView {
75            ctx: &mut self.ctx,
76            table: &mut self.table,
77        }
78    }
79}
80
81impl wasmtime_wasi_http::p2::WasiHttpView for Host {
82    fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
83        wasmtime_wasi_http::p2::WasiHttpCtxView {
84            ctx: &mut self.http,
85            table: &mut self.table,
86            hooks: &mut self.hooks,
87        }
88    }
89}
90
91#[cfg(feature = "component-model-async")]
92impl wasmtime_wasi_http::p3::WasiHttpView for Host {
93    fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
94        wasmtime_wasi_http::p3::WasiHttpCtxView {
95            table: &mut self.table,
96            ctx: &mut self.http,
97            hooks: &mut self.hooks,
98        }
99    }
100}
101
102const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
103    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
104    8080,
105);
106
107fn parse_duration(s: &str) -> Result<Duration, String> {
108    Duration::parse(Some(s)).map_err(|e| e.to_string())
109}
110
111/// Runs a WebAssembly module
112#[derive(Parser)]
113pub struct ServeCommand {
114    #[command(flatten)]
115    run: RunCommon,
116
117    /// Socket address for the web server to bind to.
118    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
119    addr: SocketAddr,
120
121    /// Socket address where, when connected to, will initiate a graceful
122    /// shutdown.
123    ///
124    /// Note that graceful shutdown is also supported on ctrl-c.
125    #[arg(long, value_name = "SOCKADDR")]
126    shutdown_addr: Option<SocketAddr>,
127
128    /// Disable log prefixes of wasi-http handlers.
129    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
130    #[arg(long)]
131    no_logging_prefix: bool,
132
133    /// The WebAssembly component to run.
134    #[arg(value_name = "WASM", required = true)]
135    component: PathBuf,
136
137    /// Maximum number of requests to send to a single component instance before
138    /// dropping it.
139    ///
140    /// This defaults to 1 for WASIp2 components and 128 for WASIp3 components.
141    #[arg(long)]
142    max_instance_reuse_count: Option<usize>,
143
144    /// Maximum number of concurrent requests to send to a single component
145    /// instance.
146    ///
147    /// This defaults to 1 for WASIp2 components and 16 for WASIp3 components.
148    /// Note that setting it to more than 1 will have no effect for WASIp2
149    /// components since they cannot be called concurrently.
150    #[arg(long)]
151    max_instance_concurrent_reuse_count: Option<usize>,
152
153    /// Time to hold an idle component instance for possible reuse before
154    /// dropping it.
155    ///
156    /// A number with no suffix or with an `s` suffix is interpreted as seconds;
157    /// other accepted suffixes include `ms` (milliseconds), `us` or `μs`
158    /// (microseconds), and `ns` (nanoseconds).
159    #[arg(long, default_value = "1s", value_parser = parse_duration)]
160    idle_instance_timeout: Duration,
161}
162
163impl ServeCommand {
164    /// Start a server to run the given wasi-http proxy component
165    pub fn execute(mut self) -> Result<()> {
166        self.run.common.init_logging()?;
167
168        // We force cli errors before starting to listen for connections so then
169        // we don't accidentally delay them to the first request.
170
171        if self.run.common.wasi.nn == Some(true) {
172            #[cfg(not(feature = "wasi-nn"))]
173            {
174                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
175            }
176        }
177
178        if self.run.common.wasi.threads == Some(true) {
179            bail!("wasi-threads does not support components yet")
180        }
181
182        // The serve command requires both wasi-http and the component model, so
183        // we enable those by default here.
184        if self.run.common.wasi.http.replace(true) == Some(false) {
185            bail!("wasi-http is required for the serve command, and must not be disabled");
186        }
187        if self.run.common.wasm.component_model.replace(true) == Some(false) {
188            bail!("components are required for the serve command, and must not be disabled");
189        }
190
191        let runtime = tokio::runtime::Builder::new_multi_thread()
192            .enable_time()
193            .enable_io()
194            .build()?;
195
196        runtime.block_on(self.serve())?;
197
198        Ok(())
199    }
200
201    /// Set up the debugger component side-car, mirroring
202    /// [`RunCommand::debugger_run`].
203    #[cfg(feature = "debug")]
204    fn debugger_setup(&mut self) -> Result<Option<RunCommand>> {
205        fn set_implicit_option(
206            place: &str,
207            name: &str,
208            setting: &mut Option<bool>,
209            value: bool,
210        ) -> Result<()> {
211            if *setting == Some(!value) {
212                bail!(
213                    "Explicitly-set option on {place} {name}={} is not compatible \
214                     with debugging-implied setting {value}",
215                    setting.unwrap()
216                );
217            }
218            *setting = Some(value);
219            Ok(())
220        }
221
222        #[cfg(feature = "gdbstub")]
223        let override_bytes = if let Some(addr) = self.run.gdbstub.as_deref() {
224            if self.run.common.debug.debugger.is_some() {
225                bail!("-g/--gdb cannot be combined with -Ddebugger=");
226            }
227            let addr = if addr.parse::<u16>().is_ok() {
228                format!("127.0.0.1:{addr}")
229            } else {
230                use std::net::SocketAddr as SA;
231                addr.parse::<SA>()
232                    .with_context(|| format!("invalid gdbstub address: `{addr}`"))?;
233                addr.to_string()
234            };
235            self.run.common.debug.debugger = Some("<built-in gdbstub>".into());
236            self.run.common.debug.arg.push(addr);
237            Some(gdbstub_component_artifact::GDBSTUB_COMPONENT)
238        } else {
239            None
240        };
241        #[cfg(not(feature = "gdbstub"))]
242        let override_bytes = None;
243
244        if let Some(debugger_component_path) = self.run.common.debug.debugger.as_ref() {
245            set_implicit_option(
246                "debuggee",
247                "guest_debug",
248                &mut self.run.common.debug.guest_debug,
249                true,
250            )?;
251            set_implicit_option(
252                "debuggee",
253                "epoch_interruption",
254                &mut self.run.common.wasm.epoch_interruption,
255                true,
256            )?;
257
258            let mut debugger_run = RunCommand::try_parse_from(
259                ["run".into(), debugger_component_path.into()]
260                    .into_iter()
261                    .chain(self.run.common.debug.arg.iter().map(OsString::from)),
262            )?;
263            debugger_run.module_bytes = override_bytes;
264
265            debugger_run.run.common.wasi.tcp.get_or_insert(true);
266            debugger_run
267                .run
268                .common
269                .wasi
270                .inherit_network
271                .get_or_insert(true);
272
273            set_implicit_option(
274                "debugger",
275                "inherit_stdin",
276                &mut debugger_run.run.common.wasi.inherit_stdin,
277                self.run.common.debug.inherit_stdin.unwrap_or(false),
278            )?;
279            set_implicit_option(
280                "debugger",
281                "inherit_stdout",
282                &mut debugger_run.run.common.wasi.inherit_stdout,
283                self.run.common.debug.inherit_stdout.unwrap_or(false),
284            )?;
285            set_implicit_option(
286                "debugger",
287                "inherit_stderr",
288                &mut debugger_run.run.common.wasi.inherit_stderr,
289                self.run.common.debug.inherit_stderr.unwrap_or(false),
290            )?;
291            Ok(Some(debugger_run))
292        } else {
293            Ok(None)
294        }
295    }
296
297    /// Run the HTTP server under a debugger component.
298    ///
299    /// Uses a single store and instance to handle all requests
300    /// sequentially, so the debugger can pause and inspect state.
301    #[cfg(feature = "debug")]
302    async fn serve_under_debugger(
303        &self,
304        mut debug_run: RunCommand,
305        engine: &Engine,
306        linker: &Linker<Host>,
307        component: &Component,
308    ) -> Result<()> {
309        let instance_pre = linker.instantiate_pre(component)?;
310        let proxy_pre = wasmtime_wasi_http::p2::bindings::ProxyPre::new(instance_pre)?;
311
312        let mut debuggee_store = self.new_store(engine, None)?;
313
314        // Pre-register component modules so the debugger can see
315        // them and set breakpoints at the initial stop.
316        debuggee_store.debug_register_component(component)?;
317
318        let debug_engine = debug_run.new_engine()?;
319        let debug_main = debug_run.run.load_module(
320            &debug_engine,
321            debug_run.module_and_args[0].as_ref(),
322            debug_run.module_bytes.as_ref().map(|v| &v[..]),
323        )?;
324        let (mut debug_store, debug_linker) =
325            debug_run.new_store_and_linker(&debug_engine, &debug_main)?;
326        let debug_component = match debug_main {
327            RunTarget::Core(_) => {
328                bail!("Debugger component is a core module; only components are supported")
329            }
330            RunTarget::Component(c) => c,
331        };
332        let mut debug_linker = match debug_linker {
333            crate::commands::run::CliLinker::Core(_) => unreachable!(),
334            crate::commands::run::CliLinker::Component(l) => l,
335        };
336        debug_run.add_debugger_api(&mut debug_linker)?;
337
338        let addr = self.addr;
339        debug_run
340            .invoke_debugger(
341                &mut debug_store,
342                &debug_component,
343                &mut debug_linker,
344                debuggee_store,
345                move |store| Box::pin(debug_serve_body(store, proxy_pre, addr)),
346            )
347            .await
348    }
349
350    fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
351        let mut builder = WasiCtxBuilder::new();
352        self.run.configure_wasip2(&mut builder)?;
353
354        if let Some(req_id) = req_id {
355            builder.env("REQUEST_ID", req_id.to_string());
356        }
357
358        let stdout_prefix: String;
359        let stderr_prefix: String;
360        match req_id {
361            Some(req_id) if !self.no_logging_prefix => {
362                stdout_prefix = format!("stdout [{req_id}] :: ");
363                stderr_prefix = format!("stderr [{req_id}] :: ");
364            }
365            _ => {
366                stdout_prefix = "".to_string();
367                stderr_prefix = "".to_string();
368            }
369        }
370        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
371        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
372
373        let mut table = wasmtime::component::ResourceTable::new();
374        if let Some(max) = self.run.common.wasi.max_resources {
375            table.set_max_capacity(max);
376        }
377        let mut host = Host {
378            table,
379            ctx: builder.build(),
380            http: self.run.wasi_http_ctx()?,
381            hooks: self.run.wasi_http_hooks(),
382
383            limits: StoreLimits::default(),
384
385            #[cfg(feature = "wasi-nn")]
386            nn: None,
387            #[cfg(feature = "wasi-config")]
388            wasi_config: None,
389            #[cfg(feature = "wasi-keyvalue")]
390            wasi_keyvalue: None,
391            #[cfg(feature = "profiling")]
392            guest_profiler: None,
393        };
394
395        if self.run.common.wasi.nn == Some(true) {
396            #[cfg(feature = "wasi-nn")]
397            {
398                let graphs = self
399                    .run
400                    .common
401                    .wasi
402                    .nn_graph
403                    .iter()
404                    .map(|g| (g.format.clone(), g.dir.clone()))
405                    .collect::<Vec<_>>();
406                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
407                host.nn.replace(WasiNnCtx::new(backends, registry));
408            }
409        }
410
411        if self.run.common.wasi.config == Some(true) {
412            #[cfg(feature = "wasi-config")]
413            {
414                let vars = WasiConfigVariables::from_iter(
415                    self.run
416                        .common
417                        .wasi
418                        .config_var
419                        .iter()
420                        .map(|v| (v.key.clone(), v.value.clone())),
421                );
422                host.wasi_config.replace(vars);
423            }
424        }
425
426        if self.run.common.wasi.keyvalue == Some(true) {
427            #[cfg(feature = "wasi-keyvalue")]
428            {
429                let ctx = WasiKeyValueCtxBuilder::new()
430                    .in_memory_data(
431                        self.run
432                            .common
433                            .wasi
434                            .keyvalue_in_memory_data
435                            .iter()
436                            .map(|v| (v.key.clone(), v.value.clone())),
437                    )
438                    .build();
439                host.wasi_keyvalue.replace(ctx);
440            }
441        }
442
443        let mut store = Store::new(engine, host);
444
445        if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
446            store.set_hostcall_fuel(fuel);
447        }
448
449        store.data_mut().limits = self.run.store_limits();
450        store.limiter(|t| &mut t.limits);
451
452        // If fuel has been configured, we want to add the configured
453        // fuel amount to this store.
454        if let Some(fuel) = self.run.common.wasm.fuel {
455            store.set_fuel(fuel)?;
456        }
457
458        Ok(store)
459    }
460
461    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
462        self.run.validate_p3_option()?;
463        let cli = self.run.validate_cli_enabled()?;
464
465        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
466        // to serve as a signal to enable all WASI interfaces instead of just
467        // those in the `proxy` world. If `-Scli` is present then add all
468        // `command` APIs and then additionally add in the required HTTP APIs.
469        //
470        // If `-Scli` isn't passed then use the `add_to_linker_async`
471        // bindings which adds just those interfaces that the proxy interface
472        // uses.
473        if cli == Some(true) {
474            self.run.add_wasmtime_wasi_to_linker(linker)?;
475            wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
476            #[cfg(feature = "component-model-async")]
477            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
478                wasmtime_wasi_http::p3::add_to_linker(linker)?;
479            }
480        } else {
481            wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
482            #[cfg(feature = "component-model-async")]
483            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
484                wasmtime_wasi_http::p3::add_to_linker(linker)?;
485                wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
486                wasmtime_wasi::p3::random::add_to_linker(linker)?;
487                wasmtime_wasi::p3::cli::add_to_linker(linker)?;
488            }
489        }
490
491        if self.run.common.wasi.nn == Some(true) {
492            #[cfg(not(feature = "wasi-nn"))]
493            {
494                bail!("support for wasi-nn was disabled at compile time");
495            }
496            #[cfg(feature = "wasi-nn")]
497            {
498                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
499                    let ctx = h.nn.as_mut().unwrap();
500                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
501                })?;
502            }
503        }
504
505        if self.run.common.wasi.config == Some(true) {
506            #[cfg(not(feature = "wasi-config"))]
507            {
508                bail!("support for wasi-config was disabled at compile time");
509            }
510            #[cfg(feature = "wasi-config")]
511            {
512                wasmtime_wasi_config::add_to_linker(linker, |h| {
513                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
514                })?;
515            }
516        }
517
518        if self.run.common.wasi.keyvalue == Some(true) {
519            #[cfg(not(feature = "wasi-keyvalue"))]
520            {
521                bail!("support for wasi-keyvalue was disabled at compile time");
522            }
523            #[cfg(feature = "wasi-keyvalue")]
524            {
525                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
526                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
527                })?;
528            }
529        }
530
531        if self.run.common.wasi.threads == Some(true) {
532            bail!("support for wasi-threads is not available with components");
533        }
534
535        if self.run.common.wasi.http == Some(false) {
536            bail!("support for wasi-http must be enabled for `serve` subcommand");
537        }
538
539        Ok(())
540    }
541
542    async fn serve(mut self) -> Result<()> {
543        use hyper::server::conn::http1;
544
545        #[cfg(feature = "debug")]
546        let debug_run = self.debugger_setup()?;
547
548        let mut config = self
549            .run
550            .common
551            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
552        config.wasm_component_model(true);
553
554        if self.run.common.wasm.timeout.is_some() {
555            config.epoch_interruption(true);
556        }
557
558        match self.run.profile {
559            Some(Profile::Native(s)) => {
560                config.profiler(s);
561            }
562            Some(Profile::Guest { .. }) => {
563                config.epoch_interruption(true);
564            }
565            None => {}
566        }
567
568        let engine = Engine::new(&config)?;
569        let mut linker = Linker::new(&engine);
570
571        self.add_to_linker(&mut linker)?;
572
573        let component = match self.run.load_module(&engine, &self.component, None)? {
574            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
575            RunTarget::Component(c) => c,
576        };
577
578        #[cfg(feature = "debug")]
579        if let Some(debug_run) = debug_run {
580            return self
581                .serve_under_debugger(debug_run, &engine, &linker, &component)
582                .await;
583        }
584
585        let instance = linker.instantiate_pre(&component)?;
586        #[cfg(feature = "component-model-async")]
587        let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
588            Ok(pre) => ProxyPre::P3(pre),
589            Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
590        };
591        #[cfg(not(feature = "component-model-async"))]
592        let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
593
594        // Spawn background task(s) waiting for graceful shutdown signals. This
595        // always listens for ctrl-c but additionally can listen for a TCP
596        // connection to the specified address.
597        let shutdown = Arc::new(GracefulShutdown::default());
598        tokio::task::spawn({
599            let shutdown = shutdown.clone();
600            async move {
601                tokio::signal::ctrl_c().await.unwrap();
602                shutdown.requested.notify_one();
603            }
604        });
605        if let Some(addr) = self.shutdown_addr {
606            let listener = tokio::net::TcpListener::bind(addr).await?;
607            eprintln!(
608                "Listening for shutdown on tcp://{}/",
609                listener.local_addr()?
610            );
611            let shutdown = shutdown.clone();
612            tokio::task::spawn(async move {
613                let _ = listener.accept().await;
614                shutdown.requested.notify_one();
615            });
616        }
617
618        let socket = match &self.addr {
619            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
620            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
621        };
622        // Conditionally enable `SO_REUSEADDR` depending on the current
623        // platform. On Unix we want this to be able to rebind an address in
624        // the `TIME_WAIT` state which can happen then a server is killed with
625        // active TCP connections and then restarted. On Windows though if
626        // `SO_REUSEADDR` is specified then it enables multiple applications to
627        // bind the port at the same time which is not something we want. Hence
628        // this is conditionally set based on the platform (and deviates from
629        // Tokio's default from always-on).
630        socket.set_reuseaddr(!cfg!(windows))?;
631        socket.bind(self.addr)?;
632        let listener = socket.listen(100)?;
633
634        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
635
636        log::info!("Listening on {}", self.addr);
637
638        let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
639            Some(interval)
640        } else if let Some(t) = self.run.common.wasm.timeout {
641            Some(EPOCH_INTERRUPT_PERIOD.min(t))
642        } else {
643            None
644        };
645        let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
646
647        let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
648            if let ProxyPre::P3(_) = &instance {
649                DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
650            } else {
651                DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
652            }
653        });
654
655        let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
656            self.max_instance_concurrent_reuse_count
657                .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
658        } else {
659            1
660        };
661
662        let handler = ProxyHandler::new(
663            HostHandlerState {
664                cmd: self,
665                engine,
666                component,
667                max_instance_reuse_count,
668                max_instance_concurrent_reuse_count,
669                // Give one shutdown guard to this handler which will track the
670                // full lifetime of any instances spawned.
671                _shutdown_guard: Box::new(shutdown.clone().increment()),
672            },
673            instance,
674        );
675
676        loop {
677            // Wait for a socket, but also "race" against shutdown to break out
678            // of this loop. Once the graceful shutdown signal is received then
679            // this loop exits immediately.
680            let (stream, _) = tokio::select! {
681                _ = shutdown.requested.notified() => break,
682                v = listener.accept() => v?,
683            };
684
685            // The Nagle algorithm can impose a significant latency penalty
686            // (e.g. 40ms on Linux) on guests which write small, intermittent
687            // response body chunks (e.g. SSE streams).  Here we disable that
688            // algorithm and rely on the guest to buffer if appropriate to avoid
689            // TCP fragmentation.
690            stream.set_nodelay(true)?;
691
692            let stream = TokioIo::new(stream);
693            let h = handler.clone();
694
695            // In addition to the shutdown guard given to the handler above,
696            // also give one to the tokio tasks doing HTTP I/O as well to ensure
697            // it keeps them alive too.
698            let shutdown_guard = shutdown.clone().increment();
699            tokio::task::spawn(async move {
700                if let Err(e) = http1::Builder::new()
701                    .keep_alive(true)
702                    .serve_connection(
703                        stream,
704                        hyper::service::service_fn(move |req| {
705                            let h = h.clone();
706                            async move {
707                                use http_body_util::{BodyExt, Full};
708                                match handle_request(h, req).await {
709                                    Ok(r) => Ok::<_, Infallible>(r),
710                                    Err(e) => {
711                                        eprintln!("error: {e:?}");
712                                        let error_html = "\
713<!doctype html>
714<html>
715<head>
716    <title>500 Internal Server Error</title>
717</head>
718<body>
719    <center>
720        <h1>500 Internal Server Error</h1>
721        <hr>
722        wasmtime
723    </center>
724</body>
725</html>";
726                                        Ok(Response::builder()
727                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
728                                            .header("Content-Type", "text/html; charset=UTF-8")
729                                            .body(
730                                                Full::new(bytes::Bytes::from(error_html))
731                                                    .map_err(|_| unreachable!())
732                                                    .boxed_unsync(),
733                                            )
734                                            .unwrap())
735                                    }
736                                }
737                            }
738                        }),
739                    )
740                    .await
741                {
742                    eprintln!("error: {e:?}");
743                }
744                drop(shutdown_guard);
745            });
746        }
747
748        drop(handler);
749
750        // Upon exiting the loop we'll no longer process any more incoming
751        // connections but there may still be outstanding connections
752        // processing in child tasks. If there are wait for those to complete
753        // before shutting down completely. Also enable short-circuiting this
754        // wait with a second ctrl-c signal.
755        if shutdown.close() {
756            return Ok(());
757        }
758        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
759        tokio::select! {
760            _ = tokio::signal::ctrl_c() => {}
761            _ = shutdown.complete.notified() => {}
762        }
763
764        Ok(())
765    }
766}
767
768struct HostHandlerState {
769    cmd: ServeCommand,
770    engine: Engine,
771    component: Component,
772    max_instance_reuse_count: usize,
773    max_instance_concurrent_reuse_count: usize,
774    _shutdown_guard: Box<dyn std::any::Any + Send + Sync>,
775}
776
777impl HandlerState for HostHandlerState {
778    type StoreData = Host;
779
780    fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
781        let mut store = self.cmd.new_store(&self.engine, req_id)?;
782        let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
783
784        Ok(StoreBundle {
785            store,
786            write_profile,
787        })
788    }
789
790    fn request_timeout(&self) -> Duration {
791        self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
792    }
793
794    fn idle_instance_timeout(&self) -> Duration {
795        self.cmd.idle_instance_timeout
796    }
797
798    fn max_instance_reuse_count(&self) -> usize {
799        self.max_instance_reuse_count
800    }
801
802    fn max_instance_concurrent_reuse_count(&self) -> usize {
803        self.max_instance_concurrent_reuse_count
804    }
805
806    fn handle_worker_error(&self, error: wasmtime::Error) {
807        eprintln!("worker error: {error}");
808    }
809}
810
811/// Helper structure to manage graceful shutdown int he accept loop above.
812#[derive(Default)]
813struct GracefulShutdown {
814    /// Async notification that shutdown has been requested.
815    requested: Notify,
816    /// Async notification that shutdown has completed, signaled when
817    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
818    complete: Notify,
819    /// Internal state related to what's in progress when shutdown is requested.
820    state: Mutex<GracefulShutdownState>,
821}
822
823#[derive(Default)]
824struct GracefulShutdownState {
825    active_tasks: u32,
826    notify_when_done: bool,
827}
828
829impl GracefulShutdown {
830    /// Increments the number of active tasks and returns a guard indicating
831    fn increment(self: Arc<Self>) -> impl Drop + Send + Sync {
832        struct Guard(Arc<GracefulShutdown>);
833
834        let mut state = self.state.lock().unwrap();
835        assert!(!state.notify_when_done);
836        state.active_tasks += 1;
837        drop(state);
838
839        return Guard(self);
840
841        impl Drop for Guard {
842            fn drop(&mut self) {
843                let mut state = self.0.state.lock().unwrap();
844                state.active_tasks -= 1;
845                if state.notify_when_done && state.active_tasks == 0 {
846                    self.0.complete.notify_one();
847                }
848            }
849        }
850    }
851
852    /// Flags this state as done spawning tasks and returns whether there are no
853    /// more child tasks remaining.
854    fn close(&self) -> bool {
855        let mut state = self.state.lock().unwrap();
856        state.notify_when_done = true;
857        state.active_tasks == 0
858    }
859}
860
861/// When executing with a timeout enabled, this is how frequently epoch
862/// interrupts will be executed to check for timeouts. If guest profiling
863/// is enabled, the guest epoch period will be used.
864const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
865
866struct EpochThread {
867    shutdown: Arc<AtomicBool>,
868    handle: Option<std::thread::JoinHandle<()>>,
869}
870
871impl EpochThread {
872    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
873        let shutdown = Arc::new(AtomicBool::new(false));
874        let handle = {
875            let shutdown = Arc::clone(&shutdown);
876            let handle = std::thread::spawn(move || {
877                while !shutdown.load(Ordering::Relaxed) {
878                    std::thread::sleep(interval);
879                    engine.increment_epoch();
880                }
881            });
882            Some(handle)
883        };
884
885        EpochThread { shutdown, handle }
886    }
887}
888
889impl Drop for EpochThread {
890    fn drop(&mut self) {
891        if let Some(handle) = self.handle.take() {
892            self.shutdown.store(true, Ordering::Relaxed);
893            handle.join().unwrap();
894        }
895    }
896}
897
898type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
899
900fn setup_epoch_handler(
901    cmd: &ServeCommand,
902    store: &mut Store<Host>,
903    component: Component,
904) -> Result<WriteProfile> {
905    // Profiling Enabled
906    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
907        #[cfg(feature = "profiling")]
908        return setup_guest_profiler(store, path.clone(), *interval, component.clone());
909        #[cfg(not(feature = "profiling"))]
910        {
911            let _ = (path, interval);
912            bail!("support for profiling disabled at compile time!");
913        }
914    }
915
916    // Profiling disabled but there's a global request timeout
917    if cmd.run.common.wasm.timeout.is_some() {
918        store.epoch_deadline_async_yield_and_update(1);
919    }
920
921    Ok(Box::new(|_store| {}))
922}
923
924#[cfg(feature = "profiling")]
925fn setup_guest_profiler(
926    store: &mut Store<Host>,
927    path: String,
928    interval: Duration,
929    component: Component,
930) -> Result<WriteProfile> {
931    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
932
933    let module_name = "<main>";
934
935    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
936        store.engine(),
937        module_name,
938        interval,
939        component,
940        std::iter::empty(),
941    )?));
942
943    fn sample(
944        mut store: StoreContextMut<Host>,
945        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
946    ) {
947        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
948        f(
949            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
950            store.as_context(),
951        );
952        store.data_mut().guest_profiler = Some(profiler);
953    }
954
955    // Hostcall entry/exit, etc.
956    store.call_hook(|store, kind| {
957        sample(store, |profiler, store| profiler.call_hook(store, kind));
958        Ok(())
959    });
960
961    store.epoch_deadline_callback(move |store| {
962        sample(store, |profiler, store| {
963            profiler.sample(store, std::time::Duration::ZERO)
964        });
965
966        Ok(UpdateDeadline::Continue(1))
967    });
968
969    store.set_epoch_deadline(1);
970
971    let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
972        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
973            .expect("profiling doesn't support threads yet");
974        if let Err(e) = std::fs::File::create(&path)
975            .map_err(wasmtime::Error::new)
976            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
977        {
978            eprintln!("failed writing profile at {path}: {e:#}");
979        } else {
980            eprintln!();
981            eprintln!("Profile written to: {path}");
982            eprintln!("View this profile at https://profiler.firefox.com/.");
983        }
984    });
985
986    Ok(write_profile)
987}
988
989/// Build a minimal error response with an empty body.
990fn error_response(status: StatusCode) -> hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>> {
991    Response::builder()
992        .status(status)
993        .body(
994            http_body_util::Empty::new()
995                .map_err(|_| unreachable!())
996                .boxed_unsync(),
997        )
998        .unwrap()
999}
1000
1001/// Debuggee body for `wasmtime serve -g`: instantiate the HTTP component
1002/// once, then handle requests sequentially on a single store.
1003#[cfg(feature = "debug")]
1004async fn debug_serve_body(
1005    store: &mut Store<Host>,
1006    proxy_pre: wasmtime_wasi_http::p2::bindings::ProxyPre<Host>,
1007    addr: SocketAddr,
1008) -> Result<()> {
1009    use hyper::server::conn::http1;
1010    use wasmtime_wasi_http::p2::bindings::http::types::Scheme;
1011    use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
1012
1013    type P2Response = std::result::Result<
1014        hyper::Response<HyperOutgoingBody>,
1015        wasmtime_wasi_http::p2::bindings::http::types::ErrorCode,
1016    >;
1017
1018    let engine_clone = store.engine().clone();
1019    let _epoch_thread = std::thread::spawn(move || {
1020        loop {
1021            std::thread::sleep(Duration::from_millis(1));
1022            engine_clone.increment_epoch();
1023        }
1024    });
1025
1026    store.epoch_deadline_async_yield_and_update(1);
1027
1028    // Instantiate the HTTP component once.
1029    let proxy = proxy_pre.instantiate_async(&mut *store).await?;
1030
1031    // Bind the TCP listener.
1032    let socket = match addr {
1033        SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
1034        SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
1035    };
1036    socket.set_reuseaddr(!cfg!(windows))?;
1037    socket.bind(addr)?;
1038    let listener = socket.listen(100)?;
1039    eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
1040
1041    // Accept loop: handle one connection at a time, requests sequentially.
1042    loop {
1043        let (stream, _) = listener.accept().await?;
1044        stream.set_nodelay(true)?;
1045        let stream = TokioIo::new(stream);
1046
1047        // Channel to bridge hyper's service_fn with our sequential
1048        // request processing on the single store.
1049        type RespBody = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
1050        let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::<(
1051            hyper::Request<hyper::body::Incoming>,
1052            tokio::sync::oneshot::Sender<std::result::Result<RespBody, Infallible>>,
1053        )>(1);
1054
1055        let serve_conn = http1::Builder::new().keep_alive(true).serve_connection(
1056            stream,
1057            hyper::service::service_fn(move |req| {
1058                let req_tx = req_tx.clone();
1059                async move {
1060                    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
1061                    if req_tx.send((req, resp_tx)).await.is_err() {
1062                        return Ok::<_, Infallible>(error_response(
1063                            StatusCode::SERVICE_UNAVAILABLE,
1064                        ));
1065                    }
1066                    resp_rx
1067                        .await
1068                        .unwrap_or(Ok(error_response(StatusCode::SERVICE_UNAVAILABLE)))
1069                }
1070            }),
1071        );
1072
1073        tokio::pin!(serve_conn);
1074
1075        loop {
1076            tokio::select! {
1077                result = &mut serve_conn => {
1078                    if let Err(e) = result {
1079                        eprintln!("connection error: {e:?}");
1080                    }
1081                    break;
1082                }
1083                msg = req_rx.recv() => {
1084                    let Some((req, resp_tx)) = msg else { break };
1085
1086                    let (p2_tx, p2_rx) = tokio::sync::oneshot::channel::<P2Response>();
1087                    let wasi_req = store
1088                        .data_mut()
1089                        .http()
1090                        .new_incoming_request(Scheme::Http, req);
1091                    let wasi_out = wasi_req.and_then(|_req| {
1092                        let out = store.data_mut().http().new_response_outparam(p2_tx);
1093                        out.map(|out| (_req, out))
1094                    });
1095                    let (wasi_req, wasi_out) = match wasi_out {
1096                        Ok(pair) => pair,
1097                        Err(e) => {
1098                            eprintln!("error creating WASI request: {e:?}");
1099                            let _ = resp_tx.send(Ok(error_response(
1100                                StatusCode::INTERNAL_SERVER_ERROR,
1101                            )));
1102                            continue;
1103                        }
1104                    };
1105
1106                    if let Err(e) = proxy
1107                        .wasi_http_incoming_handler()
1108                        .call_handle(&mut *store, wasi_req, wasi_out)
1109                        .await
1110                    {
1111                        eprintln!("handler error: {e:?}");
1112                    }
1113
1114                    let resp = match p2_rx.await {
1115                        Ok(Ok(resp)) => resp.map(|body| {
1116                            body.map_err(|e| e.into()).boxed_unsync()
1117                        }),
1118                        Ok(Err(e)) => {
1119                            eprintln!("component error: {e:?}");
1120                            error_response(StatusCode::INTERNAL_SERVER_ERROR)
1121                        }
1122                        Err(_) => error_response(StatusCode::INTERNAL_SERVER_ERROR),
1123                    };
1124                    let _ = resp_tx.send(Ok(resp));
1125                }
1126            }
1127        }
1128    }
1129}
1130
1131type Request = hyper::Request<hyper::body::Incoming>;
1132
1133async fn handle_request(
1134    handler: ProxyHandler<HostHandlerState>,
1135    req: Request,
1136) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
1137    use tokio::sync::oneshot;
1138
1139    let req_id = handler.next_req_id();
1140
1141    log::info!(
1142        "Request {req_id} handling {} to {}",
1143        req.method(),
1144        req.uri()
1145    );
1146
1147    // Here we must declare different channel types for p2 and p3 since p2's
1148    // `WasiHttpView::new_response_outparam` expects a specific kind of sender
1149    // that uses `p2::http::types::ErrorCode`, and we don't want to have to
1150    // convert from the p3 `ErrorCode` to the p2 one, only to convert again to
1151    // `wasmtime::Error`.
1152
1153    type P2Response = Result<
1154        hyper::Response<wasmtime_wasi_http::p2::body::HyperOutgoingBody>,
1155        p2::http::types::ErrorCode,
1156    >;
1157    type P3Response = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
1158
1159    enum Sender {
1160        P2(oneshot::Sender<P2Response>),
1161        P3(oneshot::Sender<P3Response>),
1162    }
1163
1164    enum Receiver {
1165        P2(oneshot::Receiver<P2Response>),
1166        P3(oneshot::Receiver<P3Response>),
1167    }
1168
1169    let (tx, rx) = match handler.instance_pre() {
1170        ProxyPre::P2(_) => {
1171            let (tx, rx) = oneshot::channel();
1172            (Sender::P2(tx), Receiver::P2(rx))
1173        }
1174        ProxyPre::P3(_) => {
1175            let (tx, rx) = oneshot::channel();
1176            (Sender::P3(tx), Receiver::P3(rx))
1177        }
1178    };
1179
1180    handler.spawn(
1181        if handler.state().max_instance_reuse_count() == 1 {
1182            Some(req_id)
1183        } else {
1184            None
1185        },
1186        Box::new(move |store, proxy| {
1187            Box::pin(
1188                async move {
1189                    match proxy {
1190                        Proxy::P2(proxy) => {
1191                            let Sender::P2(tx) = tx else { unreachable!() };
1192                            let (req, out) = store.with(move |mut store| {
1193                                let req = store
1194                                    .data_mut()
1195                                    .http()
1196                                    .new_incoming_request(p2::http::types::Scheme::Http, req)?;
1197                                let out = store.data_mut().http().new_response_outparam(tx)?;
1198                                wasmtime::error::Ok((req, out))
1199                            })?;
1200
1201                            proxy
1202                                .wasi_http_incoming_handler()
1203                                .call_handle(store, req, out)
1204                                .await
1205                        }
1206                        Proxy::P3(proxy) => {
1207                            use wasmtime_wasi_http::p3::bindings::http::types::{
1208                                ErrorCode, Request,
1209                            };
1210
1211                            let Sender::P3(tx) = tx else { unreachable!() };
1212                            let (req, body) = req.into_parts();
1213                            let body = body.map_err(ErrorCode::from_hyper_request_error);
1214                            let req = http::Request::from_parts(req, body);
1215                            let (request, request_io_result) = Request::from_http(req);
1216                            let res = proxy.handle(store, request).await??;
1217                            let res = store
1218                                .with(|mut store| res.into_http(&mut store, request_io_result))?;
1219
1220                            // With the guest response now transformed into a
1221                            // host-compatible response layer one more wrapper
1222                            // around the body. This layer is solely responsible
1223                            // for dropping a channel half on destruction, and
1224                            // this enables waiting here until the body is
1225                            // consumed by waiting for this destruction to
1226                            // happen.
1227                            let (resp_body_tx, resp_body_rx) = oneshot::channel();
1228                            let res = res.map(|body| {
1229                                let body = body.map_err(|e| e.into());
1230                                P3BodyWrapper {
1231                                    _tx: resp_body_tx,
1232                                    body,
1233                                }
1234                                .boxed_unsync()
1235                            });
1236
1237                            // If `wasmtime serve` is waiting on this response
1238                            // and actually got it then wait for the body to
1239                            // finish, otherwise it's thrown away so skip that
1240                            // step.
1241                            if tx.send(res).is_ok() {
1242                                _ = resp_body_rx.await;
1243                            }
1244
1245                            Ok(())
1246                        }
1247                    }
1248                }
1249                .map(move |result| {
1250                    if let Err(error) = result {
1251                        eprintln!("[{req_id}] :: {error:?}");
1252                    }
1253                }),
1254            )
1255        }),
1256    );
1257
1258    return Ok(match rx {
1259        Receiver::P2(rx) => rx
1260            .await
1261            .context("guest never invoked `response-outparam::set` method")?
1262            .map_err(|e| wasmtime::Error::from(e))?
1263            .map(|body| body.map_err(|e| e.into()).boxed_unsync()),
1264        Receiver::P3(rx) => rx.await?,
1265    });
1266
1267    // Forwarding implementation of `Body` to an inner `B` with the sole purpose
1268    // of carrying `_tx` to its destruction.
1269    struct P3BodyWrapper<B> {
1270        body: B,
1271        _tx: oneshot::Sender<()>,
1272    }
1273
1274    impl<B: Body + Unpin> Body for P3BodyWrapper<B> {
1275        type Data = B::Data;
1276        type Error = B::Error;
1277
1278        fn poll_frame(
1279            mut self: Pin<&mut Self>,
1280            cx: &mut Context<'_>,
1281        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
1282            Pin::new(&mut self.body).poll_frame(cx)
1283        }
1284
1285        fn is_end_stream(&self) -> bool {
1286            self.body.is_end_stream()
1287        }
1288
1289        fn size_hint(&self) -> SizeHint {
1290            self.body.size_hint()
1291        }
1292    }
1293}
1294
1295#[derive(Clone)]
1296enum Output {
1297    Stdout,
1298    Stderr,
1299}
1300
1301impl Output {
1302    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
1303        use std::io::Write;
1304
1305        match self {
1306            Output::Stdout => std::io::stdout().write_all(buf),
1307            Output::Stderr => std::io::stderr().write_all(buf),
1308        }
1309    }
1310}
1311
1312#[derive(Clone)]
1313struct LogStream {
1314    output: Output,
1315    state: Arc<LogStreamState>,
1316}
1317
1318struct LogStreamState {
1319    prefix: String,
1320    needs_prefix_on_next_write: AtomicBool,
1321}
1322
1323impl LogStream {
1324    fn new(prefix: String, output: Output) -> LogStream {
1325        LogStream {
1326            output,
1327            state: Arc::new(LogStreamState {
1328                prefix,
1329                needs_prefix_on_next_write: AtomicBool::new(true),
1330            }),
1331        }
1332    }
1333
1334    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1335        while !bytes.is_empty() {
1336            if self
1337                .state
1338                .needs_prefix_on_next_write
1339                .load(Ordering::Relaxed)
1340            {
1341                self.output.write_all(self.state.prefix.as_bytes())?;
1342                self.state
1343                    .needs_prefix_on_next_write
1344                    .store(false, Ordering::Relaxed);
1345            }
1346            match bytes.iter().position(|b| *b == b'\n') {
1347                Some(i) => {
1348                    let (a, b) = bytes.split_at(i + 1);
1349                    bytes = b;
1350                    self.output.write_all(a)?;
1351                    self.state
1352                        .needs_prefix_on_next_write
1353                        .store(true, Ordering::Relaxed);
1354                }
1355                None => {
1356                    self.output.write_all(bytes)?;
1357                    break;
1358                }
1359            }
1360        }
1361
1362        Ok(())
1363    }
1364}
1365
1366impl wasmtime_wasi::cli::StdoutStream for LogStream {
1367    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1368        Box::new(self.clone())
1369    }
1370    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1371        Box::new(self.clone())
1372    }
1373}
1374
1375impl wasmtime_wasi::cli::IsTerminal for LogStream {
1376    fn is_terminal(&self) -> bool {
1377        match &self.output {
1378            Output::Stdout => std::io::stdout().is_terminal(),
1379            Output::Stderr => std::io::stderr().is_terminal(),
1380        }
1381    }
1382}
1383
1384impl wasmtime_wasi::p2::OutputStream for LogStream {
1385    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1386        self.write_all(&bytes)
1387            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1388        Ok(())
1389    }
1390
1391    fn flush(&mut self) -> StreamResult<()> {
1392        Ok(())
1393    }
1394
1395    fn check_write(&mut self) -> StreamResult<usize> {
1396        Ok(1024 * 1024)
1397    }
1398}
1399
1400#[async_trait::async_trait]
1401impl wasmtime_wasi::p2::Pollable for LogStream {
1402    async fn ready(&mut self) {}
1403}
1404
1405impl AsyncWrite for LogStream {
1406    fn poll_write(
1407        mut self: Pin<&mut Self>,
1408        _cx: &mut Context<'_>,
1409        buf: &[u8],
1410    ) -> Poll<io::Result<usize>> {
1411        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1412    }
1413    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1414        Poll::Ready(Ok(()))
1415    }
1416    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1417        Poll::Ready(Ok(()))
1418    }
1419}
1420
1421/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1422/// try to use it when we can. The main cost of the pooling allocator, however,
1423/// is the virtual memory required to run it. Not all systems support the same
1424/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1425/// only support 39 bits of virtual address space.
1426///
1427/// The pooling allocator, by default, will request 1000 linear memories each
1428/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1429/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1430/// some systems, so there the pooling allocator will fail by default.
1431///
1432/// This function attempts to dynamically determine the hint for the pooling
1433/// allocator. This returns `Some(true)` if the pooling allocator should be used
1434/// by default, or `None` or an error otherwise.
1435///
1436/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1437/// with a maximum size that's N bits large where we force all memories to be
1438/// static. This should attempt to acquire N bits of the virtual address space.
1439/// If successful that should mean that the pooling allocator is OK to use, but
1440/// if it fails then the pooling allocator is not used and the normal mmap-based
1441/// implementation is used instead.
1442fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1443    use wasmtime::{Config, Memory, MemoryType};
1444    const BITS_TO_TEST: u32 = 42;
1445    let mut config = Config::new();
1446    config.wasm_memory64(true);
1447    config.memory_reservation(1 << BITS_TO_TEST);
1448    let engine = Engine::new(&config)?;
1449    let mut store = Store::new(&engine, ());
1450    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1451    // page size here from the maximum size.
1452    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1453    if Memory::new(&mut store, ty).is_ok() {
1454        Ok(Some(true))
1455    } else {
1456        Ok(None)
1457    }
1458}