Skip to main content

wasmtime_cli/commands/
serve.rs

1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use futures::future::FutureExt;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::UnsyncBoxBody;
8use hyper::body::{Body, Frame, SizeHint};
9use std::convert::Infallible;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::{
14    path::PathBuf,
15    sync::{
16        Arc, Mutex,
17        atomic::{AtomicBool, Ordering},
18    },
19    time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker};
24use wasmtime::{
25    Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail, error::Context as _,
26};
27use wasmtime_cli_flags::opt::WasmtimeOptionValue;
28use wasmtime_wasi::p2::{StreamError, StreamResult};
29use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
30#[cfg(feature = "component-model-async")]
31use wasmtime_wasi_http::handler::p2::bindings as p2;
32use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
33use wasmtime_wasi_http::io::TokioIo;
34use wasmtime_wasi_http::{WasiHttpCtx, p2::WasiHttpView};
35
36#[cfg(feature = "wasi-config")]
37use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
38#[cfg(feature = "wasi-keyvalue")]
39use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
40#[cfg(feature = "wasi-nn")]
41use wasmtime_wasi_nn::wit::WasiNnCtx;
42
43const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
44const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
45const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
46
47struct Host {
48    table: wasmtime::component::ResourceTable,
49    ctx: WasiCtx,
50    http: WasiHttpCtx,
51    hooks: HttpHooks,
52
53    limits: StoreLimits,
54
55    #[cfg(feature = "wasi-nn")]
56    nn: Option<WasiNnCtx>,
57
58    #[cfg(feature = "wasi-config")]
59    wasi_config: Option<WasiConfigVariables>,
60
61    #[cfg(feature = "wasi-keyvalue")]
62    wasi_keyvalue: Option<WasiKeyValueCtx>,
63
64    #[cfg(feature = "profiling")]
65    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
66}
67
68impl WasiView for Host {
69    fn ctx(&mut self) -> WasiCtxView<'_> {
70        WasiCtxView {
71            ctx: &mut self.ctx,
72            table: &mut self.table,
73        }
74    }
75}
76
77impl wasmtime_wasi_http::p2::WasiHttpView for Host {
78    fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
79        wasmtime_wasi_http::p2::WasiHttpCtxView {
80            ctx: &mut self.http,
81            table: &mut self.table,
82            hooks: &mut self.hooks,
83        }
84    }
85}
86
87#[cfg(feature = "component-model-async")]
88impl wasmtime_wasi_http::p3::WasiHttpView for Host {
89    fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
90        wasmtime_wasi_http::p3::WasiHttpCtxView {
91            table: &mut self.table,
92            ctx: &mut self.http,
93            hooks: &mut self.hooks,
94        }
95    }
96}
97
98const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
99    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
100    8080,
101);
102
103fn parse_duration(s: &str) -> Result<Duration, String> {
104    Duration::parse(Some(s)).map_err(|e| e.to_string())
105}
106
107/// Runs a WebAssembly module
108#[derive(Parser)]
109pub struct ServeCommand {
110    #[command(flatten)]
111    run: RunCommon,
112
113    /// Socket address for the web server to bind to.
114    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
115    addr: SocketAddr,
116
117    /// Socket address where, when connected to, will initiate a graceful
118    /// shutdown.
119    ///
120    /// Note that graceful shutdown is also supported on ctrl-c.
121    #[arg(long, value_name = "SOCKADDR")]
122    shutdown_addr: Option<SocketAddr>,
123
124    /// Disable log prefixes of wasi-http handlers.
125    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
126    #[arg(long)]
127    no_logging_prefix: bool,
128
129    /// The WebAssembly component to run.
130    #[arg(value_name = "WASM", required = true)]
131    component: PathBuf,
132
133    /// Maximum number of requests to send to a single component instance before
134    /// dropping it.
135    ///
136    /// This defaults to 1 for WASIp2 components and 128 for WASIp3 components.
137    #[arg(long)]
138    max_instance_reuse_count: Option<usize>,
139
140    /// Maximum number of concurrent requests to send to a single component
141    /// instance.
142    ///
143    /// This defaults to 1 for WASIp2 components and 16 for WASIp3 components.
144    /// Note that setting it to more than 1 will have no effect for WASIp2
145    /// components since they cannot be called concurrently.
146    #[arg(long)]
147    max_instance_concurrent_reuse_count: Option<usize>,
148
149    /// Time to hold an idle component instance for possible reuse before
150    /// dropping it.
151    ///
152    /// A number with no suffix or with an `s` suffix is interpreted as seconds;
153    /// other accepted suffixes include `ms` (milliseconds), `us` or `μs`
154    /// (microseconds), and `ns` (nanoseconds).
155    #[arg(long, default_value = "1s", value_parser = parse_duration)]
156    idle_instance_timeout: Duration,
157}
158
159impl ServeCommand {
160    /// Start a server to run the given wasi-http proxy component
161    pub fn execute(mut self) -> Result<()> {
162        self.run.common.init_logging()?;
163
164        // We force cli errors before starting to listen for connections so then
165        // we don't accidentally delay them to the first request.
166
167        if self.run.common.wasi.nn == Some(true) {
168            #[cfg(not(feature = "wasi-nn"))]
169            {
170                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
171            }
172        }
173
174        if self.run.common.wasi.threads == Some(true) {
175            bail!("wasi-threads does not support components yet")
176        }
177
178        // The serve command requires both wasi-http and the component model, so
179        // we enable those by default here.
180        if self.run.common.wasi.http.replace(true) == Some(false) {
181            bail!("wasi-http is required for the serve command, and must not be disabled");
182        }
183        if self.run.common.wasm.component_model.replace(true) == Some(false) {
184            bail!("components are required for the serve command, and must not be disabled");
185        }
186
187        let runtime = tokio::runtime::Builder::new_multi_thread()
188            .enable_time()
189            .enable_io()
190            .build()?;
191
192        runtime.block_on(self.serve())?;
193
194        Ok(())
195    }
196
197    fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
198        let mut builder = WasiCtxBuilder::new();
199        self.run.configure_wasip2(&mut builder)?;
200
201        if let Some(req_id) = req_id {
202            builder.env("REQUEST_ID", req_id.to_string());
203        }
204
205        let stdout_prefix: String;
206        let stderr_prefix: String;
207        match req_id {
208            Some(req_id) if !self.no_logging_prefix => {
209                stdout_prefix = format!("stdout [{req_id}] :: ");
210                stderr_prefix = format!("stderr [{req_id}] :: ");
211            }
212            _ => {
213                stdout_prefix = "".to_string();
214                stderr_prefix = "".to_string();
215            }
216        }
217        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
218        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
219
220        let mut table = wasmtime::component::ResourceTable::new();
221        if let Some(max) = self.run.common.wasi.max_resources {
222            table.set_max_capacity(max);
223        }
224        let mut host = Host {
225            table,
226            ctx: builder.build(),
227            http: self.run.wasi_http_ctx()?,
228            hooks: self.run.wasi_http_hooks(),
229
230            limits: StoreLimits::default(),
231
232            #[cfg(feature = "wasi-nn")]
233            nn: None,
234            #[cfg(feature = "wasi-config")]
235            wasi_config: None,
236            #[cfg(feature = "wasi-keyvalue")]
237            wasi_keyvalue: None,
238            #[cfg(feature = "profiling")]
239            guest_profiler: None,
240        };
241
242        if self.run.common.wasi.nn == Some(true) {
243            #[cfg(feature = "wasi-nn")]
244            {
245                let graphs = self
246                    .run
247                    .common
248                    .wasi
249                    .nn_graph
250                    .iter()
251                    .map(|g| (g.format.clone(), g.dir.clone()))
252                    .collect::<Vec<_>>();
253                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
254                host.nn.replace(WasiNnCtx::new(backends, registry));
255            }
256        }
257
258        if self.run.common.wasi.config == Some(true) {
259            #[cfg(feature = "wasi-config")]
260            {
261                let vars = WasiConfigVariables::from_iter(
262                    self.run
263                        .common
264                        .wasi
265                        .config_var
266                        .iter()
267                        .map(|v| (v.key.clone(), v.value.clone())),
268                );
269                host.wasi_config.replace(vars);
270            }
271        }
272
273        if self.run.common.wasi.keyvalue == Some(true) {
274            #[cfg(feature = "wasi-keyvalue")]
275            {
276                let ctx = WasiKeyValueCtxBuilder::new()
277                    .in_memory_data(
278                        self.run
279                            .common
280                            .wasi
281                            .keyvalue_in_memory_data
282                            .iter()
283                            .map(|v| (v.key.clone(), v.value.clone())),
284                    )
285                    .build();
286                host.wasi_keyvalue.replace(ctx);
287            }
288        }
289
290        let mut store = Store::new(engine, host);
291
292        if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
293            store.set_hostcall_fuel(fuel);
294        }
295
296        store.data_mut().limits = self.run.store_limits();
297        store.limiter(|t| &mut t.limits);
298
299        // If fuel has been configured, we want to add the configured
300        // fuel amount to this store.
301        if let Some(fuel) = self.run.common.wasm.fuel {
302            store.set_fuel(fuel)?;
303        }
304
305        Ok(store)
306    }
307
308    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
309        self.run.validate_p3_option()?;
310        let cli = self.run.validate_cli_enabled()?;
311
312        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
313        // to serve as a signal to enable all WASI interfaces instead of just
314        // those in the `proxy` world. If `-Scli` is present then add all
315        // `command` APIs and then additionally add in the required HTTP APIs.
316        //
317        // If `-Scli` isn't passed then use the `add_to_linker_async`
318        // bindings which adds just those interfaces that the proxy interface
319        // uses.
320        if cli == Some(true) {
321            self.run.add_wasmtime_wasi_to_linker(linker)?;
322            wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
323            #[cfg(feature = "component-model-async")]
324            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
325                wasmtime_wasi_http::p3::add_to_linker(linker)?;
326            }
327        } else {
328            wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
329            #[cfg(feature = "component-model-async")]
330            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
331                wasmtime_wasi_http::p3::add_to_linker(linker)?;
332                wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
333                wasmtime_wasi::p3::random::add_to_linker(linker)?;
334                wasmtime_wasi::p3::cli::add_to_linker(linker)?;
335            }
336        }
337
338        if self.run.common.wasi.nn == Some(true) {
339            #[cfg(not(feature = "wasi-nn"))]
340            {
341                bail!("support for wasi-nn was disabled at compile time");
342            }
343            #[cfg(feature = "wasi-nn")]
344            {
345                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
346                    let ctx = h.nn.as_mut().unwrap();
347                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
348                })?;
349            }
350        }
351
352        if self.run.common.wasi.config == Some(true) {
353            #[cfg(not(feature = "wasi-config"))]
354            {
355                bail!("support for wasi-config was disabled at compile time");
356            }
357            #[cfg(feature = "wasi-config")]
358            {
359                wasmtime_wasi_config::add_to_linker(linker, |h| {
360                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
361                })?;
362            }
363        }
364
365        if self.run.common.wasi.keyvalue == Some(true) {
366            #[cfg(not(feature = "wasi-keyvalue"))]
367            {
368                bail!("support for wasi-keyvalue was disabled at compile time");
369            }
370            #[cfg(feature = "wasi-keyvalue")]
371            {
372                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
373                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
374                })?;
375            }
376        }
377
378        if self.run.common.wasi.threads == Some(true) {
379            bail!("support for wasi-threads is not available with components");
380        }
381
382        if self.run.common.wasi.http == Some(false) {
383            bail!("support for wasi-http must be enabled for `serve` subcommand");
384        }
385
386        Ok(())
387    }
388
389    async fn serve(mut self) -> Result<()> {
390        use hyper::server::conn::http1;
391
392        let mut config = self
393            .run
394            .common
395            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
396        config.wasm_component_model(true);
397
398        if self.run.common.wasm.timeout.is_some() {
399            config.epoch_interruption(true);
400        }
401
402        match self.run.profile {
403            Some(Profile::Native(s)) => {
404                config.profiler(s);
405            }
406            Some(Profile::Guest { .. }) => {
407                config.epoch_interruption(true);
408            }
409            None => {}
410        }
411
412        let engine = Engine::new(&config)?;
413        let mut linker = Linker::new(&engine);
414
415        self.add_to_linker(&mut linker)?;
416
417        let component = match self.run.load_module(&engine, &self.component)? {
418            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
419            RunTarget::Component(c) => c,
420        };
421
422        let instance = linker.instantiate_pre(&component)?;
423        #[cfg(feature = "component-model-async")]
424        let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
425            Ok(pre) => ProxyPre::P3(pre),
426            Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
427        };
428        #[cfg(not(feature = "component-model-async"))]
429        let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
430
431        // Spawn background task(s) waiting for graceful shutdown signals. This
432        // always listens for ctrl-c but additionally can listen for a TCP
433        // connection to the specified address.
434        let shutdown = Arc::new(GracefulShutdown::default());
435        tokio::task::spawn({
436            let shutdown = shutdown.clone();
437            async move {
438                tokio::signal::ctrl_c().await.unwrap();
439                shutdown.requested.notify_one();
440            }
441        });
442        if let Some(addr) = self.shutdown_addr {
443            let listener = tokio::net::TcpListener::bind(addr).await?;
444            eprintln!(
445                "Listening for shutdown on tcp://{}/",
446                listener.local_addr()?
447            );
448            let shutdown = shutdown.clone();
449            tokio::task::spawn(async move {
450                let _ = listener.accept().await;
451                shutdown.requested.notify_one();
452            });
453        }
454
455        let socket = match &self.addr {
456            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
457            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
458        };
459        // Conditionally enable `SO_REUSEADDR` depending on the current
460        // platform. On Unix we want this to be able to rebind an address in
461        // the `TIME_WAIT` state which can happen then a server is killed with
462        // active TCP connections and then restarted. On Windows though if
463        // `SO_REUSEADDR` is specified then it enables multiple applications to
464        // bind the port at the same time which is not something we want. Hence
465        // this is conditionally set based on the platform (and deviates from
466        // Tokio's default from always-on).
467        socket.set_reuseaddr(!cfg!(windows))?;
468        socket.bind(self.addr)?;
469        let listener = socket.listen(100)?;
470
471        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
472
473        log::info!("Listening on {}", self.addr);
474
475        let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
476            Some(interval)
477        } else if let Some(t) = self.run.common.wasm.timeout {
478            Some(EPOCH_INTERRUPT_PERIOD.min(t))
479        } else {
480            None
481        };
482        let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
483
484        let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
485            if let ProxyPre::P3(_) = &instance {
486                DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
487            } else {
488                DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
489            }
490        });
491
492        let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
493            self.max_instance_concurrent_reuse_count
494                .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
495        } else {
496            1
497        };
498
499        let handler = ProxyHandler::new(
500            HostHandlerState {
501                cmd: self,
502                engine,
503                component,
504                max_instance_reuse_count,
505                max_instance_concurrent_reuse_count,
506            },
507            instance,
508        );
509
510        loop {
511            // Wait for a socket, but also "race" against shutdown to break out
512            // of this loop. Once the graceful shutdown signal is received then
513            // this loop exits immediately.
514            let (stream, _) = tokio::select! {
515                _ = shutdown.requested.notified() => break,
516                v = listener.accept() => v?,
517            };
518
519            // The Nagle algorithm can impose a significant latency penalty
520            // (e.g. 40ms on Linux) on guests which write small, intermittent
521            // response body chunks (e.g. SSE streams).  Here we disable that
522            // algorithm and rely on the guest to buffer if appropriate to avoid
523            // TCP fragmentation.
524            stream.set_nodelay(true)?;
525
526            let stream = TokioIo::new(stream);
527            let h = handler.clone();
528            let shutdown_guard = shutdown.clone().increment();
529            tokio::task::spawn(async move {
530                if let Err(e) = http1::Builder::new()
531                    .keep_alive(true)
532                    .serve_connection(
533                        stream,
534                        hyper::service::service_fn(move |req| {
535                            let h = h.clone();
536                            async move {
537                                use http_body_util::{BodyExt, Full};
538                                match handle_request(h, req).await {
539                                    Ok(r) => Ok::<_, Infallible>(r),
540                                    Err(e) => {
541                                        eprintln!("error: {e:?}");
542                                        let error_html = "\
543<!doctype html>
544<html>
545<head>
546    <title>500 Internal Server Error</title>
547</head>
548<body>
549    <center>
550        <h1>500 Internal Server Error</h1>
551        <hr>
552        wasmtime
553    </center>
554</body>
555</html>";
556                                        Ok(Response::builder()
557                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
558                                            .header("Content-Type", "text/html; charset=UTF-8")
559                                            .body(
560                                                Full::new(bytes::Bytes::from(error_html))
561                                                    .map_err(|_| unreachable!())
562                                                    .boxed_unsync(),
563                                            )
564                                            .unwrap())
565                                    }
566                                }
567                            }
568                        }),
569                    )
570                    .await
571                {
572                    eprintln!("error: {e:?}");
573                }
574                drop(shutdown_guard);
575            });
576        }
577
578        // Upon exiting the loop we'll no longer process any more incoming
579        // connections but there may still be outstanding connections
580        // processing in child tasks. If there are wait for those to complete
581        // before shutting down completely. Also enable short-circuiting this
582        // wait with a second ctrl-c signal.
583        if shutdown.close() {
584            return Ok(());
585        }
586        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
587        tokio::select! {
588            _ = tokio::signal::ctrl_c() => {}
589            _ = shutdown.complete.notified() => {}
590        }
591
592        Ok(())
593    }
594}
595
596struct HostHandlerState {
597    cmd: ServeCommand,
598    engine: Engine,
599    component: Component,
600    max_instance_reuse_count: usize,
601    max_instance_concurrent_reuse_count: usize,
602}
603
604impl HandlerState for HostHandlerState {
605    type StoreData = Host;
606
607    fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
608        let mut store = self.cmd.new_store(&self.engine, req_id)?;
609        let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
610
611        Ok(StoreBundle {
612            store,
613            write_profile,
614        })
615    }
616
617    fn request_timeout(&self) -> Duration {
618        self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
619    }
620
621    fn idle_instance_timeout(&self) -> Duration {
622        self.cmd.idle_instance_timeout
623    }
624
625    fn max_instance_reuse_count(&self) -> usize {
626        self.max_instance_reuse_count
627    }
628
629    fn max_instance_concurrent_reuse_count(&self) -> usize {
630        self.max_instance_concurrent_reuse_count
631    }
632
633    fn handle_worker_error(&self, error: wasmtime::Error) {
634        eprintln!("worker error: {error}");
635    }
636}
637
638/// Helper structure to manage graceful shutdown int he accept loop above.
639#[derive(Default)]
640struct GracefulShutdown {
641    /// Async notification that shutdown has been requested.
642    requested: Notify,
643    /// Async notification that shutdown has completed, signaled when
644    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
645    complete: Notify,
646    /// Internal state related to what's in progress when shutdown is requested.
647    state: Mutex<GracefulShutdownState>,
648}
649
650#[derive(Default)]
651struct GracefulShutdownState {
652    active_tasks: u32,
653    notify_when_done: bool,
654}
655
656impl GracefulShutdown {
657    /// Increments the number of active tasks and returns a guard indicating
658    fn increment(self: Arc<Self>) -> impl Drop {
659        struct Guard(Arc<GracefulShutdown>);
660
661        let mut state = self.state.lock().unwrap();
662        assert!(!state.notify_when_done);
663        state.active_tasks += 1;
664        drop(state);
665
666        return Guard(self);
667
668        impl Drop for Guard {
669            fn drop(&mut self) {
670                let mut state = self.0.state.lock().unwrap();
671                state.active_tasks -= 1;
672                if state.notify_when_done && state.active_tasks == 0 {
673                    self.0.complete.notify_one();
674                }
675            }
676        }
677    }
678
679    /// Flags this state as done spawning tasks and returns whether there are no
680    /// more child tasks remaining.
681    fn close(&self) -> bool {
682        let mut state = self.state.lock().unwrap();
683        state.notify_when_done = true;
684        state.active_tasks == 0
685    }
686}
687
688/// When executing with a timeout enabled, this is how frequently epoch
689/// interrupts will be executed to check for timeouts. If guest profiling
690/// is enabled, the guest epoch period will be used.
691const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
692
693struct EpochThread {
694    shutdown: Arc<AtomicBool>,
695    handle: Option<std::thread::JoinHandle<()>>,
696}
697
698impl EpochThread {
699    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
700        let shutdown = Arc::new(AtomicBool::new(false));
701        let handle = {
702            let shutdown = Arc::clone(&shutdown);
703            let handle = std::thread::spawn(move || {
704                while !shutdown.load(Ordering::Relaxed) {
705                    std::thread::sleep(interval);
706                    engine.increment_epoch();
707                }
708            });
709            Some(handle)
710        };
711
712        EpochThread { shutdown, handle }
713    }
714}
715
716impl Drop for EpochThread {
717    fn drop(&mut self) {
718        if let Some(handle) = self.handle.take() {
719            self.shutdown.store(true, Ordering::Relaxed);
720            handle.join().unwrap();
721        }
722    }
723}
724
725type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
726
727fn setup_epoch_handler(
728    cmd: &ServeCommand,
729    store: &mut Store<Host>,
730    component: Component,
731) -> Result<WriteProfile> {
732    // Profiling Enabled
733    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
734        #[cfg(feature = "profiling")]
735        return setup_guest_profiler(store, path.clone(), *interval, component.clone());
736        #[cfg(not(feature = "profiling"))]
737        {
738            let _ = (path, interval);
739            bail!("support for profiling disabled at compile time!");
740        }
741    }
742
743    // Profiling disabled but there's a global request timeout
744    if cmd.run.common.wasm.timeout.is_some() {
745        store.epoch_deadline_async_yield_and_update(1);
746    }
747
748    Ok(Box::new(|_store| {}))
749}
750
751#[cfg(feature = "profiling")]
752fn setup_guest_profiler(
753    store: &mut Store<Host>,
754    path: String,
755    interval: Duration,
756    component: Component,
757) -> Result<WriteProfile> {
758    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
759
760    let module_name = "<main>";
761
762    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
763        store.engine(),
764        module_name,
765        interval,
766        component,
767        std::iter::empty(),
768    )?));
769
770    fn sample(
771        mut store: StoreContextMut<Host>,
772        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
773    ) {
774        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
775        f(
776            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
777            store.as_context(),
778        );
779        store.data_mut().guest_profiler = Some(profiler);
780    }
781
782    // Hostcall entry/exit, etc.
783    store.call_hook(|store, kind| {
784        sample(store, |profiler, store| profiler.call_hook(store, kind));
785        Ok(())
786    });
787
788    store.epoch_deadline_callback(move |store| {
789        sample(store, |profiler, store| {
790            profiler.sample(store, std::time::Duration::ZERO)
791        });
792
793        Ok(UpdateDeadline::Continue(1))
794    });
795
796    store.set_epoch_deadline(1);
797
798    let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
799        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
800            .expect("profiling doesn't support threads yet");
801        if let Err(e) = std::fs::File::create(&path)
802            .map_err(wasmtime::Error::new)
803            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
804        {
805            eprintln!("failed writing profile at {path}: {e:#}");
806        } else {
807            eprintln!();
808            eprintln!("Profile written to: {path}");
809            eprintln!("View this profile at https://profiler.firefox.com/.");
810        }
811    });
812
813    Ok(write_profile)
814}
815
816type Request = hyper::Request<hyper::body::Incoming>;
817
818async fn handle_request(
819    handler: ProxyHandler<HostHandlerState>,
820    req: Request,
821) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
822    use tokio::sync::oneshot;
823
824    let req_id = handler.next_req_id();
825
826    log::info!(
827        "Request {req_id} handling {} to {}",
828        req.method(),
829        req.uri()
830    );
831
832    // Here we must declare different channel types for p2 and p3 since p2's
833    // `WasiHttpView::new_response_outparam` expects a specific kind of sender
834    // that uses `p2::http::types::ErrorCode`, and we don't want to have to
835    // convert from the p3 `ErrorCode` to the p2 one, only to convert again to
836    // `wasmtime::Error`.
837
838    type P2Response = Result<
839        hyper::Response<wasmtime_wasi_http::p2::body::HyperOutgoingBody>,
840        p2::http::types::ErrorCode,
841    >;
842    type P3Response = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
843
844    enum Sender {
845        P2(oneshot::Sender<P2Response>),
846        P3(oneshot::Sender<P3Response>),
847    }
848
849    enum Receiver {
850        P2(oneshot::Receiver<P2Response>),
851        P3(oneshot::Receiver<P3Response>),
852    }
853
854    let (tx, rx) = match handler.instance_pre() {
855        ProxyPre::P2(_) => {
856            let (tx, rx) = oneshot::channel();
857            (Sender::P2(tx), Receiver::P2(rx))
858        }
859        ProxyPre::P3(_) => {
860            let (tx, rx) = oneshot::channel();
861            (Sender::P3(tx), Receiver::P3(rx))
862        }
863    };
864
865    handler.spawn(
866        if handler.state().max_instance_reuse_count() == 1 {
867            Some(req_id)
868        } else {
869            None
870        },
871        Box::new(move |store, proxy| {
872            Box::pin(
873                async move {
874                    match proxy {
875                        Proxy::P2(proxy) => {
876                            let Sender::P2(tx) = tx else { unreachable!() };
877                            let (req, out) = store.with(move |mut store| {
878                                let req = store
879                                    .data_mut()
880                                    .http()
881                                    .new_incoming_request(p2::http::types::Scheme::Http, req)?;
882                                let out = store.data_mut().http().new_response_outparam(tx)?;
883                                wasmtime::error::Ok((req, out))
884                            })?;
885
886                            proxy
887                                .wasi_http_incoming_handler()
888                                .call_handle(store, req, out)
889                                .await
890                        }
891                        Proxy::P3(proxy) => {
892                            use wasmtime_wasi_http::p3::bindings::http::types::{
893                                ErrorCode, Request,
894                            };
895
896                            let Sender::P3(tx) = tx else { unreachable!() };
897                            let (req, body) = req.into_parts();
898                            let body = body.map_err(ErrorCode::from_hyper_request_error);
899                            let req = http::Request::from_parts(req, body);
900                            let (request, request_io_result) = Request::from_http(req);
901                            let res = proxy.handle(store, request).await??;
902                            let res = store
903                                .with(|mut store| res.into_http(&mut store, request_io_result))?;
904
905                            // With the guest response now transformed into a
906                            // host-compatible response layer one more wrapper
907                            // around the body. This layer is solely responsible
908                            // for dropping a channel half on destruction, and
909                            // this enables waiting here until the body is
910                            // consumed by waiting for this destruction to
911                            // happen.
912                            let (resp_body_tx, resp_body_rx) = oneshot::channel();
913                            let res = res.map(|body| {
914                                let body = body.map_err(|e| e.into());
915                                P3BodyWrapper {
916                                    _tx: resp_body_tx,
917                                    body,
918                                }
919                                .boxed_unsync()
920                            });
921
922                            // If `wasmtime serve` is waiting on this response
923                            // and actually got it then wait for the body to
924                            // finish, otherwise it's thrown away so skip that
925                            // step.
926                            if tx.send(res).is_ok() {
927                                _ = resp_body_rx.await;
928                            }
929
930                            Ok(())
931                        }
932                    }
933                }
934                .map(move |result| {
935                    if let Err(error) = result {
936                        eprintln!("[{req_id}] :: {error:?}");
937                    }
938                }),
939            )
940        }),
941    );
942
943    return Ok(match rx {
944        Receiver::P2(rx) => rx
945            .await
946            .context("guest never invoked `response-outparam::set` method")?
947            .map_err(|e| wasmtime::Error::from(e))?
948            .map(|body| body.map_err(|e| e.into()).boxed_unsync()),
949        Receiver::P3(rx) => rx.await?,
950    });
951
952    // Forwarding implementation of `Body` to an inner `B` with the sole purpose
953    // of carrying `_tx` to its destruction.
954    struct P3BodyWrapper<B> {
955        body: B,
956        _tx: oneshot::Sender<()>,
957    }
958
959    impl<B: Body + Unpin> Body for P3BodyWrapper<B> {
960        type Data = B::Data;
961        type Error = B::Error;
962
963        fn poll_frame(
964            mut self: Pin<&mut Self>,
965            cx: &mut Context<'_>,
966        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
967            Pin::new(&mut self.body).poll_frame(cx)
968        }
969
970        fn is_end_stream(&self) -> bool {
971            self.body.is_end_stream()
972        }
973
974        fn size_hint(&self) -> SizeHint {
975            self.body.size_hint()
976        }
977    }
978}
979
980#[derive(Clone)]
981enum Output {
982    Stdout,
983    Stderr,
984}
985
986impl Output {
987    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
988        use std::io::Write;
989
990        match self {
991            Output::Stdout => std::io::stdout().write_all(buf),
992            Output::Stderr => std::io::stderr().write_all(buf),
993        }
994    }
995}
996
997#[derive(Clone)]
998struct LogStream {
999    output: Output,
1000    state: Arc<LogStreamState>,
1001}
1002
1003struct LogStreamState {
1004    prefix: String,
1005    needs_prefix_on_next_write: AtomicBool,
1006}
1007
1008impl LogStream {
1009    fn new(prefix: String, output: Output) -> LogStream {
1010        LogStream {
1011            output,
1012            state: Arc::new(LogStreamState {
1013                prefix,
1014                needs_prefix_on_next_write: AtomicBool::new(true),
1015            }),
1016        }
1017    }
1018
1019    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1020        while !bytes.is_empty() {
1021            if self
1022                .state
1023                .needs_prefix_on_next_write
1024                .load(Ordering::Relaxed)
1025            {
1026                self.output.write_all(self.state.prefix.as_bytes())?;
1027                self.state
1028                    .needs_prefix_on_next_write
1029                    .store(false, Ordering::Relaxed);
1030            }
1031            match bytes.iter().position(|b| *b == b'\n') {
1032                Some(i) => {
1033                    let (a, b) = bytes.split_at(i + 1);
1034                    bytes = b;
1035                    self.output.write_all(a)?;
1036                    self.state
1037                        .needs_prefix_on_next_write
1038                        .store(true, Ordering::Relaxed);
1039                }
1040                None => {
1041                    self.output.write_all(bytes)?;
1042                    break;
1043                }
1044            }
1045        }
1046
1047        Ok(())
1048    }
1049}
1050
1051impl wasmtime_wasi::cli::StdoutStream for LogStream {
1052    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1053        Box::new(self.clone())
1054    }
1055    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1056        Box::new(self.clone())
1057    }
1058}
1059
1060impl wasmtime_wasi::cli::IsTerminal for LogStream {
1061    fn is_terminal(&self) -> bool {
1062        match &self.output {
1063            Output::Stdout => std::io::stdout().is_terminal(),
1064            Output::Stderr => std::io::stderr().is_terminal(),
1065        }
1066    }
1067}
1068
1069impl wasmtime_wasi::p2::OutputStream for LogStream {
1070    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1071        self.write_all(&bytes)
1072            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1073        Ok(())
1074    }
1075
1076    fn flush(&mut self) -> StreamResult<()> {
1077        Ok(())
1078    }
1079
1080    fn check_write(&mut self) -> StreamResult<usize> {
1081        Ok(1024 * 1024)
1082    }
1083}
1084
1085#[async_trait::async_trait]
1086impl wasmtime_wasi::p2::Pollable for LogStream {
1087    async fn ready(&mut self) {}
1088}
1089
1090impl AsyncWrite for LogStream {
1091    fn poll_write(
1092        mut self: Pin<&mut Self>,
1093        _cx: &mut Context<'_>,
1094        buf: &[u8],
1095    ) -> Poll<io::Result<usize>> {
1096        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1097    }
1098    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1099        Poll::Ready(Ok(()))
1100    }
1101    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1102        Poll::Ready(Ok(()))
1103    }
1104}
1105
1106/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1107/// try to use it when we can. The main cost of the pooling allocator, however,
1108/// is the virtual memory required to run it. Not all systems support the same
1109/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1110/// only support 39 bits of virtual address space.
1111///
1112/// The pooling allocator, by default, will request 1000 linear memories each
1113/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1114/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1115/// some systems, so there the pooling allocator will fail by default.
1116///
1117/// This function attempts to dynamically determine the hint for the pooling
1118/// allocator. This returns `Some(true)` if the pooling allocator should be used
1119/// by default, or `None` or an error otherwise.
1120///
1121/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1122/// with a maximum size that's N bits large where we force all memories to be
1123/// static. This should attempt to acquire N bits of the virtual address space.
1124/// If successful that should mean that the pooling allocator is OK to use, but
1125/// if it fails then the pooling allocator is not used and the normal mmap-based
1126/// implementation is used instead.
1127fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1128    use wasmtime::{Config, Memory, MemoryType};
1129    const BITS_TO_TEST: u32 = 42;
1130    let mut config = Config::new();
1131    config.wasm_memory64(true);
1132    config.memory_reservation(1 << BITS_TO_TEST);
1133    let engine = Engine::new(&config)?;
1134    let mut store = Store::new(&engine, ());
1135    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1136    // page size here from the maximum size.
1137    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1138    if Memory::new(&mut store, ty).is_ok() {
1139        Ok(Some(true))
1140    } else {
1141        Ok(None)
1142    }
1143}