Skip to main content

wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::{
6    path::PathBuf,
7    sync::{
8        atomic::{AtomicBool, AtomicU64, Ordering},
9        Arc,
10    },
11};
12use wasmtime::component::Linker;
13use wasmtime::{Config, Engine, Memory, MemoryType, Store, StoreLimits};
14use wasmtime_wasi::{StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
15use wasmtime_wasi_http::bindings::http::types::Scheme;
16use wasmtime_wasi_http::bindings::ProxyPre;
17use wasmtime_wasi_http::io::TokioIo;
18use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};
19
20#[cfg(feature = "wasi-keyvalue")]
21use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
22#[cfg(feature = "wasi-nn")]
23use wasmtime_wasi_nn::wit::WasiNnCtx;
24#[cfg(feature = "wasi-runtime-config")]
25use wasmtime_wasi_runtime_config::{WasiRuntimeConfig, WasiRuntimeConfigVariables};
26
27struct Host {
28    table: wasmtime::component::ResourceTable,
29    ctx: WasiCtx,
30    http: WasiHttpCtx,
31
32    limits: StoreLimits,
33
34    #[cfg(feature = "wasi-nn")]
35    nn: Option<WasiNnCtx>,
36
37    #[cfg(feature = "wasi-runtime-config")]
38    wasi_runtime_config: Option<WasiRuntimeConfigVariables>,
39
40    #[cfg(feature = "wasi-keyvalue")]
41    wasi_keyvalue: Option<WasiKeyValueCtx>,
42}
43
44impl WasiView for Host {
45    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
46        &mut self.table
47    }
48
49    fn ctx(&mut self) -> &mut WasiCtx {
50        &mut self.ctx
51    }
52}
53
54impl WasiHttpView for Host {
55    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
56        &mut self.table
57    }
58
59    fn ctx(&mut self) -> &mut WasiHttpCtx {
60        &mut self.http
61    }
62}
63
64const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
65    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
66    8080,
67);
68
69/// Runs a WebAssembly module
70#[derive(Parser, PartialEq)]
71pub struct ServeCommand {
72    #[command(flatten)]
73    run: RunCommon,
74
75    /// Socket address for the web server to bind to.
76    #[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR )]
77    addr: SocketAddr,
78
79    /// The WebAssembly component to run.
80    #[arg(value_name = "WASM", required = true)]
81    component: PathBuf,
82}
83
84impl ServeCommand {
85    /// Start a server to run the given wasi-http proxy component
86    pub fn execute(mut self) -> Result<()> {
87        self.run.common.init_logging()?;
88
89        // We force cli errors before starting to listen for connections so then
90        // we don't accidentally delay them to the first request.
91        if let Some(Profile::Guest { .. }) = &self.run.profile {
92            bail!("Cannot use the guest profiler with components");
93        }
94
95        if self.run.common.wasi.nn == Some(true) {
96            #[cfg(not(feature = "wasi-nn"))]
97            {
98                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
99            }
100        }
101
102        if self.run.common.wasi.threads == Some(true) {
103            bail!("wasi-threads does not support components yet")
104        }
105
106        // The serve command requires both wasi-http and the component model, so
107        // we enable those by default here.
108        if self.run.common.wasi.http.replace(true) == Some(false) {
109            bail!("wasi-http is required for the serve command, and must not be disabled");
110        }
111        if self.run.common.wasm.component_model.replace(true) == Some(false) {
112            bail!("components are required for the serve command, and must not be disabled");
113        }
114
115        let runtime = tokio::runtime::Builder::new_multi_thread()
116            .enable_time()
117            .enable_io()
118            .build()?;
119
120        runtime.block_on(async move {
121            tokio::select! {
122                _ = tokio::signal::ctrl_c() => {
123                    Ok::<_, anyhow::Error>(())
124                }
125
126                res = self.serve() => {
127                    res
128                }
129            }
130        })?;
131
132        Ok(())
133    }
134
135    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
136        let mut builder = WasiCtxBuilder::new();
137        self.run.configure_wasip2(&mut builder)?;
138
139        builder.env("REQUEST_ID", req_id.to_string());
140
141        builder.stdout(LogStream::new(
142            format!("stdout [{req_id}] :: "),
143            Output::Stdout,
144        ));
145
146        builder.stderr(LogStream::new(
147            format!("stderr [{req_id}] :: "),
148            Output::Stderr,
149        ));
150
151        let mut table = wasmtime::component::ResourceTable::new();
152        if let Some(max) = self.run.common.wasi.max_resources {
153            table.set_max_capacity(max);
154        }
155        let mut host = Host {
156            table,
157            ctx: builder.build(),
158            http: self.run.wasi_http_ctx()?,
159
160            limits: StoreLimits::default(),
161
162            #[cfg(feature = "wasi-nn")]
163            nn: None,
164            #[cfg(feature = "wasi-runtime-config")]
165            wasi_runtime_config: None,
166            #[cfg(feature = "wasi-keyvalue")]
167            wasi_keyvalue: None,
168        };
169
170        if self.run.common.wasi.nn == Some(true) {
171            #[cfg(feature = "wasi-nn")]
172            {
173                let graphs = self
174                    .run
175                    .common
176                    .wasi
177                    .nn_graph
178                    .iter()
179                    .map(|g| (g.format.clone(), g.dir.clone()))
180                    .collect::<Vec<_>>();
181                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
182                host.nn.replace(WasiNnCtx::new(backends, registry));
183            }
184        }
185
186        if self.run.common.wasi.runtime_config == Some(true) {
187            #[cfg(feature = "wasi-runtime-config")]
188            {
189                let vars = WasiRuntimeConfigVariables::from_iter(
190                    self.run
191                        .common
192                        .wasi
193                        .runtime_config_var
194                        .iter()
195                        .map(|v| (v.key.clone(), v.value.clone())),
196                );
197                host.wasi_runtime_config.replace(vars);
198            }
199        }
200
201        if self.run.common.wasi.keyvalue == Some(true) {
202            #[cfg(feature = "wasi-keyvalue")]
203            {
204                let ctx = WasiKeyValueCtxBuilder::new()
205                    .in_memory_data(
206                        self.run
207                            .common
208                            .wasi
209                            .keyvalue_in_memory_data
210                            .iter()
211                            .map(|v| (v.key.clone(), v.value.clone())),
212                    )
213                    .build();
214                host.wasi_keyvalue.replace(ctx);
215            }
216        }
217
218        let mut store = Store::new(engine, host);
219
220        if self.run.common.wasm.timeout.is_some() {
221            store.set_epoch_deadline(u64::from(EPOCH_PRECISION) + 1);
222        }
223        if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
224            store.set_hostcall_fuel(fuel);
225        }
226
227        store.data_mut().limits = self.run.store_limits();
228        store.limiter(|t| &mut t.limits);
229
230        // If fuel has been configured, we want to add the configured
231        // fuel amount to this store.
232        if let Some(fuel) = self.run.common.wasm.fuel {
233            store.set_fuel(fuel)?;
234        }
235
236        Ok(store)
237    }
238
239    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
240        let mut cli = self.run.common.wasi.cli;
241
242        // Accept -Scommon as a deprecated alias for -Scli.
243        if let Some(common) = self.run.common.wasi.common {
244            if cli.is_some() {
245                bail!(
246                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
247                );
248            } else {
249                // In the future, we may add a warning here to tell users to use
250                // `-S cli` instead of `-S common`.
251                cli = Some(common);
252            }
253        }
254
255        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
256        // to serve as a signal to enable all WASI interfaces instead of just
257        // those in the `proxy` world. If `-Scli` is present then add all
258        // `command` APIs and then additionally add in the required HTTP APIs.
259        //
260        // If `-Scli` isn't passed then use the `add_to_linker_async`
261        // bindings which adds just those interfaces that the proxy interface
262        // uses.
263        if cli == Some(true) {
264            wasmtime_wasi::add_to_linker_async(linker)?;
265            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
266        } else {
267            wasmtime_wasi_http::add_to_linker_async(linker)?;
268        }
269
270        if self.run.common.wasi.nn == Some(true) {
271            #[cfg(not(feature = "wasi-nn"))]
272            {
273                bail!("support for wasi-nn was disabled at compile time");
274            }
275            #[cfg(feature = "wasi-nn")]
276            {
277                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
278                    let ctx = h.nn.as_mut().unwrap();
279                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
280                })?;
281            }
282        }
283
284        if self.run.common.wasi.runtime_config == Some(true) {
285            #[cfg(not(feature = "wasi-runtime-config"))]
286            {
287                bail!("support for wasi-runtime-config was disabled at compile time");
288            }
289            #[cfg(feature = "wasi-runtime-config")]
290            {
291                wasmtime_wasi_runtime_config::add_to_linker(linker, |h| {
292                    WasiRuntimeConfig::from(h.wasi_runtime_config.as_ref().unwrap())
293                })?;
294            }
295        }
296
297        if self.run.common.wasi.keyvalue == Some(true) {
298            #[cfg(not(feature = "wasi-keyvalue"))]
299            {
300                bail!("support for wasi-keyvalue was disabled at compile time");
301            }
302            #[cfg(feature = "wasi-keyvalue")]
303            {
304                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
305                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
306                })?;
307            }
308        }
309
310        if self.run.common.wasi.threads == Some(true) {
311            bail!("support for wasi-threads is not available with components");
312        }
313
314        if self.run.common.wasi.http == Some(false) {
315            bail!("support for wasi-http must be enabled for `serve` subcommand");
316        }
317
318        Ok(())
319    }
320
321    async fn serve(mut self) -> Result<()> {
322        use hyper::server::conn::http1;
323
324        let mut config = self
325            .run
326            .common
327            .config(None, use_pooling_allocator_by_default().unwrap_or(None))?;
328        config.wasm_component_model(true);
329        config.async_support(true);
330
331        if self.run.common.wasm.timeout.is_some() {
332            config.epoch_interruption(true);
333        }
334
335        match self.run.profile {
336            Some(Profile::Native(s)) => {
337                config.profiler(s);
338            }
339
340            // We bail early in `execute` if the guest profiler is configured.
341            Some(Profile::Guest { .. }) => unreachable!(),
342
343            None => {}
344        }
345
346        let engine = Engine::new(&config)?;
347        let mut linker = Linker::new(&engine);
348
349        self.add_to_linker(&mut linker)?;
350
351        let component = match self.run.load_module(&engine, &self.component)? {
352            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
353            RunTarget::Component(c) => c,
354        };
355
356        let instance = linker.instantiate_pre(&component)?;
357        let instance = ProxyPre::new(instance)?;
358
359        let socket = match &self.addr {
360            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
361            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
362        };
363        // Conditionally enable `SO_REUSEADDR` depending on the current
364        // platform. On Unix we want this to be able to rebind an address in
365        // the `TIME_WAIT` state which can happen then a server is killed with
366        // active TCP connections and then restarted. On Windows though if
367        // `SO_REUSEADDR` is specified then it enables multiple applications to
368        // bind the port at the same time which is not something we want. Hence
369        // this is conditionally set based on the platform (and deviates from
370        // Tokio's default from always-on).
371        socket.set_reuseaddr(!cfg!(windows))?;
372        socket.bind(self.addr)?;
373        let listener = socket.listen(100)?;
374
375        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
376
377        let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout {
378            Some(EpochThread::spawn(
379                timeout / EPOCH_PRECISION,
380                engine.clone(),
381            ))
382        } else {
383            None
384        };
385
386        log::info!("Listening on {}", self.addr);
387
388        let handler = ProxyHandler::new(self, engine, instance);
389
390        loop {
391            let (stream, _) = listener.accept().await?;
392            let stream = TokioIo::new(stream);
393            let h = handler.clone();
394            tokio::task::spawn(async {
395                if let Err(e) = http1::Builder::new()
396                    .keep_alive(true)
397                    .serve_connection(
398                        stream,
399                        hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
400                    )
401                    .await
402                {
403                    eprintln!("error: {e:?}");
404                }
405            });
406        }
407    }
408}
409
410/// This is the number of epochs that we will observe before expiring a request handler. As
411/// instances may be started at any point within an epoch, and epochs are counted globally per
412/// engine, we expire after `EPOCH_PRECISION + 1` epochs have been observed. This gives a maximum
413/// overshoot of `timeout / EPOCH_PRECISION`, which is more desirable than expiring early.
414const EPOCH_PRECISION: u32 = 10;
415
416struct EpochThread {
417    shutdown: Arc<AtomicBool>,
418    handle: Option<std::thread::JoinHandle<()>>,
419}
420
421impl EpochThread {
422    fn spawn(timeout: std::time::Duration, engine: Engine) -> Self {
423        let shutdown = Arc::new(AtomicBool::new(false));
424        let handle = {
425            let shutdown = Arc::clone(&shutdown);
426            let handle = std::thread::spawn(move || {
427                while !shutdown.load(Ordering::Relaxed) {
428                    std::thread::sleep(timeout);
429                    engine.increment_epoch();
430                }
431            });
432            Some(handle)
433        };
434
435        EpochThread { shutdown, handle }
436    }
437}
438
439impl Drop for EpochThread {
440    fn drop(&mut self) {
441        if let Some(handle) = self.handle.take() {
442            self.shutdown.store(true, Ordering::Relaxed);
443            handle.join().unwrap();
444        }
445    }
446}
447
448struct ProxyHandlerInner {
449    cmd: ServeCommand,
450    engine: Engine,
451    instance_pre: ProxyPre<Host>,
452    next_id: AtomicU64,
453}
454
455impl ProxyHandlerInner {
456    fn next_req_id(&self) -> u64 {
457        self.next_id.fetch_add(1, Ordering::Relaxed)
458    }
459}
460
461#[derive(Clone)]
462struct ProxyHandler(Arc<ProxyHandlerInner>);
463
464impl ProxyHandler {
465    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
466        Self(Arc::new(ProxyHandlerInner {
467            cmd,
468            engine,
469            instance_pre,
470            next_id: AtomicU64::from(0),
471        }))
472    }
473}
474
475type Request = hyper::Request<hyper::body::Incoming>;
476
477async fn handle_request(
478    ProxyHandler(inner): ProxyHandler,
479    req: Request,
480) -> Result<hyper::Response<HyperOutgoingBody>> {
481    let (sender, receiver) = tokio::sync::oneshot::channel();
482
483    let req_id = inner.next_req_id();
484
485    log::info!(
486        "Request {req_id} handling {} to {}",
487        req.method(),
488        req.uri()
489    );
490
491    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
492
493    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
494    let out = store.data_mut().new_response_outparam(sender)?;
495    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
496
497    let task = tokio::task::spawn(async move {
498        if let Err(e) = proxy
499            .wasi_http_incoming_handler()
500            .call_handle(store, req, out)
501            .await
502        {
503            log::error!("[{req_id}] :: {:#?}", e);
504            return Err(e);
505        }
506
507        Ok(())
508    });
509
510    match receiver.await {
511        Ok(Ok(resp)) => Ok(resp),
512        Ok(Err(e)) => Err(e.into()),
513        Err(_) => {
514            // An error in the receiver (`RecvError`) only indicates that the
515            // task exited before a response was sent (i.e., the sender was
516            // dropped); it does not describe the underlying cause of failure.
517            // Instead we retrieve and propagate the error from inside the task
518            // which should more clearly tell the user what went wrong. Note
519            // that we assume the task has already exited at this point so the
520            // `await` should resolve immediately.
521            let e = match task.await {
522                Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"),
523                Err(e) => e.into(),
524            };
525            bail!("guest never invoked `response-outparam::set` method: {e:?}")
526        }
527    }
528}
529
530#[derive(Clone)]
531enum Output {
532    Stdout,
533    Stderr,
534}
535
536impl Output {
537    fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
538        use std::io::Write;
539
540        match self {
541            Output::Stdout => std::io::stdout().write_all(buf),
542            Output::Stderr => std::io::stderr().write_all(buf),
543        }
544        .map_err(|e| anyhow!(e))
545    }
546}
547
548#[derive(Clone)]
549struct LogStream {
550    prefix: String,
551    output: Output,
552    needs_prefix_on_next_write: bool,
553}
554
555impl LogStream {
556    fn new(prefix: String, output: Output) -> LogStream {
557        LogStream {
558            prefix,
559            output,
560            needs_prefix_on_next_write: true,
561        }
562    }
563}
564
565impl wasmtime_wasi::StdoutStream for LogStream {
566    fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
567        Box::new(self.clone())
568    }
569
570    fn isatty(&self) -> bool {
571        use std::io::IsTerminal;
572
573        match &self.output {
574            Output::Stdout => std::io::stdout().is_terminal(),
575            Output::Stderr => std::io::stderr().is_terminal(),
576        }
577    }
578}
579
580impl wasmtime_wasi::HostOutputStream for LogStream {
581    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
582        let mut bytes = &bytes[..];
583
584        while !bytes.is_empty() {
585            if self.needs_prefix_on_next_write {
586                self.output
587                    .write_all(self.prefix.as_bytes())
588                    .map_err(StreamError::LastOperationFailed)?;
589                self.needs_prefix_on_next_write = false;
590            }
591            match bytes.iter().position(|b| *b == b'\n') {
592                Some(i) => {
593                    let (a, b) = bytes.split_at(i + 1);
594                    bytes = b;
595                    self.output
596                        .write_all(a)
597                        .map_err(StreamError::LastOperationFailed)?;
598                    self.needs_prefix_on_next_write = true;
599                }
600                None => {
601                    self.output
602                        .write_all(bytes)
603                        .map_err(StreamError::LastOperationFailed)?;
604                    break;
605                }
606            }
607        }
608
609        Ok(())
610    }
611
612    fn flush(&mut self) -> StreamResult<()> {
613        Ok(())
614    }
615
616    fn check_write(&mut self) -> StreamResult<usize> {
617        Ok(1024 * 1024)
618    }
619}
620
621#[async_trait::async_trait]
622impl wasmtime_wasi::Subscribe for LogStream {
623    async fn ready(&mut self) {}
624}
625
626/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
627/// try to use it when we can. The main cost of the pooling allocator, however,
628/// is the virtual memory required to run it. Not all systems support the same
629/// amount of virtual memory, for example some aarch64 and riscv64 configuration
630/// only support 39 bits of virtual address space.
631///
632/// The pooling allocator, by default, will request 1000 linear memories each
633/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
634/// being about 42 bits of the address space. This exceeds the 39 bit limit of
635/// some systems, so there the pooling allocator will fail by default.
636///
637/// This function attempts to dynamically determine the hint for the pooling
638/// allocator. This returns `Some(true)` if the pooling allocator should be used
639/// by default, or `None` or an error otherwise.
640///
641/// The method for testing this is to allocate a 0-sized 64-bit linear memory
642/// with a maximum size that's N bits large where we force all memories to be
643/// static. This should attempt to acquire N bits of the virtual address space.
644/// If successful that should mean that the pooling allocator is OK to use, but
645/// if it fails then the pooling allocator is not used and the normal mmap-based
646/// implementation is used instead.
647fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
648    const BITS_TO_TEST: u32 = 42;
649    let mut config = Config::new();
650    config.wasm_memory64(true);
651    config.static_memory_maximum_size(1 << BITS_TO_TEST);
652    let engine = Engine::new(&config)?;
653    let mut store = Store::new(&engine, ());
654    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
655    // page size here from the maximum size.
656    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
657    if Memory::new(&mut store, ty).is_ok() {
658        Ok(Some(true))
659    } else {
660        Ok(None)
661    }
662}