wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::{
6    path::PathBuf,
7    sync::{
8        atomic::{AtomicBool, AtomicU64, Ordering},
9        Arc,
10    },
11};
12use wasmtime::component::Linker;
13use wasmtime::{Engine, Store, StoreLimits};
14use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
15use wasmtime_wasi_http::bindings::http::types::Scheme;
16use wasmtime_wasi_http::bindings::ProxyPre;
17use wasmtime_wasi_http::io::TokioIo;
18use wasmtime_wasi_http::{
19    body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView, DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS,
20    DEFAULT_OUTGOING_BODY_CHUNK_SIZE,
21};
22
23#[cfg(feature = "wasi-config")]
24use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
25#[cfg(feature = "wasi-keyvalue")]
26use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
27#[cfg(feature = "wasi-nn")]
28use wasmtime_wasi_nn::wit::WasiNnCtx;
29
30struct Host {
31    table: wasmtime::component::ResourceTable,
32    ctx: WasiCtx,
33    http: WasiHttpCtx,
34    http_outgoing_body_buffer_chunks: Option<usize>,
35    http_outgoing_body_chunk_size: Option<usize>,
36
37    limits: StoreLimits,
38
39    #[cfg(feature = "wasi-nn")]
40    nn: Option<WasiNnCtx>,
41
42    #[cfg(feature = "wasi-config")]
43    wasi_config: Option<WasiConfigVariables>,
44
45    #[cfg(feature = "wasi-keyvalue")]
46    wasi_keyvalue: Option<WasiKeyValueCtx>,
47}
48
49impl IoView for Host {
50    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
51        &mut self.table
52    }
53}
54impl WasiView for Host {
55    fn ctx(&mut self) -> &mut WasiCtx {
56        &mut self.ctx
57    }
58}
59
60impl WasiHttpView for Host {
61    fn ctx(&mut self) -> &mut WasiHttpCtx {
62        &mut self.http
63    }
64
65    fn outgoing_body_buffer_chunks(&mut self) -> usize {
66        self.http_outgoing_body_buffer_chunks
67            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
68    }
69
70    fn outgoing_body_chunk_size(&mut self) -> usize {
71        self.http_outgoing_body_chunk_size
72            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
73    }
74}
75
76const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
77    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
78    8080,
79);
80
81/// Runs a WebAssembly module
82#[derive(Parser)]
83pub struct ServeCommand {
84    #[command(flatten)]
85    run: RunCommon,
86
87    /// Socket address for the web server to bind to.
88    #[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
89    addr: SocketAddr,
90
91    /// Disable log prefixes of wasi-http handlers.
92    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
93    #[arg(long = "no-logging-prefix")]
94    no_logging_prefix: bool,
95
96    /// The WebAssembly component to run.
97    #[arg(value_name = "WASM", required = true)]
98    component: PathBuf,
99}
100
101impl ServeCommand {
102    /// Start a server to run the given wasi-http proxy component
103    pub fn execute(mut self) -> Result<()> {
104        self.run.common.init_logging()?;
105
106        // We force cli errors before starting to listen for connections so then
107        // we don't accidentally delay them to the first request.
108        if let Some(Profile::Guest { .. }) = &self.run.profile {
109            bail!("Cannot use the guest profiler with components");
110        }
111
112        if self.run.common.wasi.nn == Some(true) {
113            #[cfg(not(feature = "wasi-nn"))]
114            {
115                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
116            }
117        }
118
119        if self.run.common.wasi.threads == Some(true) {
120            bail!("wasi-threads does not support components yet")
121        }
122
123        // The serve command requires both wasi-http and the component model, so
124        // we enable those by default here.
125        if self.run.common.wasi.http.replace(true) == Some(false) {
126            bail!("wasi-http is required for the serve command, and must not be disabled");
127        }
128        if self.run.common.wasm.component_model.replace(true) == Some(false) {
129            bail!("components are required for the serve command, and must not be disabled");
130        }
131
132        let runtime = tokio::runtime::Builder::new_multi_thread()
133            .enable_time()
134            .enable_io()
135            .build()?;
136
137        runtime.block_on(async move {
138            tokio::select! {
139                _ = tokio::signal::ctrl_c() => {
140                    Ok::<_, anyhow::Error>(())
141                }
142
143                res = self.serve() => {
144                    res
145                }
146            }
147        })?;
148
149        Ok(())
150    }
151
152    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
153        let mut builder = WasiCtxBuilder::new();
154        self.run.configure_wasip2(&mut builder)?;
155
156        builder.env("REQUEST_ID", req_id.to_string());
157
158        let stdout_prefix: String;
159        let stderr_prefix: String;
160        if self.no_logging_prefix {
161            stdout_prefix = "".to_string();
162            stderr_prefix = "".to_string();
163        } else {
164            stdout_prefix = format!("stdout [{req_id}] :: ");
165            stderr_prefix = format!("stderr [{req_id}] :: ");
166        }
167        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
168        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
169
170        let mut host = Host {
171            table: wasmtime::component::ResourceTable::new(),
172            ctx: builder.build(),
173            http: WasiHttpCtx::new(),
174            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
175            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
176
177            limits: StoreLimits::default(),
178
179            #[cfg(feature = "wasi-nn")]
180            nn: None,
181            #[cfg(feature = "wasi-config")]
182            wasi_config: None,
183            #[cfg(feature = "wasi-keyvalue")]
184            wasi_keyvalue: None,
185        };
186
187        if self.run.common.wasi.nn == Some(true) {
188            #[cfg(feature = "wasi-nn")]
189            {
190                let graphs = self
191                    .run
192                    .common
193                    .wasi
194                    .nn_graph
195                    .iter()
196                    .map(|g| (g.format.clone(), g.dir.clone()))
197                    .collect::<Vec<_>>();
198                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
199                host.nn.replace(WasiNnCtx::new(backends, registry));
200            }
201        }
202
203        if self.run.common.wasi.config == Some(true) {
204            #[cfg(feature = "wasi-config")]
205            {
206                let vars = WasiConfigVariables::from_iter(
207                    self.run
208                        .common
209                        .wasi
210                        .config_var
211                        .iter()
212                        .map(|v| (v.key.clone(), v.value.clone())),
213                );
214                host.wasi_config.replace(vars);
215            }
216        }
217
218        if self.run.common.wasi.keyvalue == Some(true) {
219            #[cfg(feature = "wasi-keyvalue")]
220            {
221                let ctx = WasiKeyValueCtxBuilder::new()
222                    .in_memory_data(
223                        self.run
224                            .common
225                            .wasi
226                            .keyvalue_in_memory_data
227                            .iter()
228                            .map(|v| (v.key.clone(), v.value.clone())),
229                    )
230                    .build();
231                host.wasi_keyvalue.replace(ctx);
232            }
233        }
234
235        let mut store = Store::new(engine, host);
236
237        if self.run.common.wasm.timeout.is_some() {
238            store.set_epoch_deadline(u64::from(EPOCH_PRECISION) + 1);
239        }
240
241        store.data_mut().limits = self.run.store_limits();
242        store.limiter(|t| &mut t.limits);
243
244        // If fuel has been configured, we want to add the configured
245        // fuel amount to this store.
246        if let Some(fuel) = self.run.common.wasm.fuel {
247            store.set_fuel(fuel)?;
248        }
249
250        Ok(store)
251    }
252
253    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
254        let mut cli = self.run.common.wasi.cli;
255
256        // Accept -Scommon as a deprecated alias for -Scli.
257        if let Some(common) = self.run.common.wasi.common {
258            if cli.is_some() {
259                bail!(
260                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
261                );
262            } else {
263                // In the future, we may add a warning here to tell users to use
264                // `-S cli` instead of `-S common`.
265                cli = Some(common);
266            }
267        }
268
269        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
270        // to serve as a signal to enable all WASI interfaces instead of just
271        // those in the `proxy` world. If `-Scli` is present then add all
272        // `command` APIs and then additionally add in the required HTTP APIs.
273        //
274        // If `-Scli` isn't passed then use the `add_to_linker_async`
275        // bindings which adds just those interfaces that the proxy interface
276        // uses.
277        if cli == Some(true) {
278            let link_options = self.run.compute_wasi_features();
279            wasmtime_wasi::add_to_linker_with_options_async(linker, &link_options)?;
280            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
281        } else {
282            wasmtime_wasi_http::add_to_linker_async(linker)?;
283        }
284
285        if self.run.common.wasi.nn == Some(true) {
286            #[cfg(not(feature = "wasi-nn"))]
287            {
288                bail!("support for wasi-nn was disabled at compile time");
289            }
290            #[cfg(feature = "wasi-nn")]
291            {
292                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
293                    let ctx = h.nn.as_mut().unwrap();
294                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
295                })?;
296            }
297        }
298
299        if self.run.common.wasi.config == Some(true) {
300            #[cfg(not(feature = "wasi-config"))]
301            {
302                bail!("support for wasi-config was disabled at compile time");
303            }
304            #[cfg(feature = "wasi-config")]
305            {
306                wasmtime_wasi_config::add_to_linker(linker, |h| {
307                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
308                })?;
309            }
310        }
311
312        if self.run.common.wasi.keyvalue == Some(true) {
313            #[cfg(not(feature = "wasi-keyvalue"))]
314            {
315                bail!("support for wasi-keyvalue was disabled at compile time");
316            }
317            #[cfg(feature = "wasi-keyvalue")]
318            {
319                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
320                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
321                })?;
322            }
323        }
324
325        if self.run.common.wasi.threads == Some(true) {
326            bail!("support for wasi-threads is not available with components");
327        }
328
329        if self.run.common.wasi.http == Some(false) {
330            bail!("support for wasi-http must be enabled for `serve` subcommand");
331        }
332
333        Ok(())
334    }
335
336    async fn serve(mut self) -> Result<()> {
337        use hyper::server::conn::http1;
338
339        let mut config = self
340            .run
341            .common
342            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
343        config.wasm_component_model(true);
344        config.async_support(true);
345
346        if self.run.common.wasm.timeout.is_some() {
347            config.epoch_interruption(true);
348        }
349
350        match self.run.profile {
351            Some(Profile::Native(s)) => {
352                config.profiler(s);
353            }
354
355            // We bail early in `execute` if the guest profiler is configured.
356            Some(Profile::Guest { .. }) => unreachable!(),
357
358            None => {}
359        }
360
361        let engine = Engine::new(&config)?;
362        let mut linker = Linker::new(&engine);
363
364        self.add_to_linker(&mut linker)?;
365
366        let component = match self.run.load_module(&engine, &self.component)? {
367            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
368            RunTarget::Component(c) => c,
369        };
370
371        let instance = linker.instantiate_pre(&component)?;
372        let instance = ProxyPre::new(instance)?;
373
374        let socket = match &self.addr {
375            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
376            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
377        };
378        // Conditionally enable `SO_REUSEADDR` depending on the current
379        // platform. On Unix we want this to be able to rebind an address in
380        // the `TIME_WAIT` state which can happen then a server is killed with
381        // active TCP connections and then restarted. On Windows though if
382        // `SO_REUSEADDR` is specified then it enables multiple applications to
383        // bind the port at the same time which is not something we want. Hence
384        // this is conditionally set based on the platform (and deviates from
385        // Tokio's default from always-on).
386        socket.set_reuseaddr(!cfg!(windows))?;
387        socket.bind(self.addr)?;
388        let listener = socket.listen(100)?;
389
390        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
391
392        let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout {
393            Some(EpochThread::spawn(
394                timeout / EPOCH_PRECISION,
395                engine.clone(),
396            ))
397        } else {
398            None
399        };
400
401        log::info!("Listening on {}", self.addr);
402
403        let handler = ProxyHandler::new(self, engine, instance);
404
405        loop {
406            let (stream, _) = listener.accept().await?;
407            let stream = TokioIo::new(stream);
408            let h = handler.clone();
409            tokio::task::spawn(async {
410                if let Err(e) = http1::Builder::new()
411                    .keep_alive(true)
412                    .serve_connection(
413                        stream,
414                        hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
415                    )
416                    .await
417                {
418                    eprintln!("error: {e:?}");
419                }
420            });
421        }
422    }
423}
424
425/// This is the number of epochs that we will observe before expiring a request handler. As
426/// instances may be started at any point within an epoch, and epochs are counted globally per
427/// engine, we expire after `EPOCH_PRECISION + 1` epochs have been observed. This gives a maximum
428/// overshoot of `timeout / EPOCH_PRECISION`, which is more desirable than expiring early.
429const EPOCH_PRECISION: u32 = 10;
430
431struct EpochThread {
432    shutdown: Arc<AtomicBool>,
433    handle: Option<std::thread::JoinHandle<()>>,
434}
435
436impl EpochThread {
437    fn spawn(timeout: std::time::Duration, engine: Engine) -> Self {
438        let shutdown = Arc::new(AtomicBool::new(false));
439        let handle = {
440            let shutdown = Arc::clone(&shutdown);
441            let handle = std::thread::spawn(move || {
442                while !shutdown.load(Ordering::Relaxed) {
443                    std::thread::sleep(timeout);
444                    engine.increment_epoch();
445                }
446            });
447            Some(handle)
448        };
449
450        EpochThread { shutdown, handle }
451    }
452}
453
454impl Drop for EpochThread {
455    fn drop(&mut self) {
456        if let Some(handle) = self.handle.take() {
457            self.shutdown.store(true, Ordering::Relaxed);
458            handle.join().unwrap();
459        }
460    }
461}
462
463struct ProxyHandlerInner {
464    cmd: ServeCommand,
465    engine: Engine,
466    instance_pre: ProxyPre<Host>,
467    next_id: AtomicU64,
468}
469
470impl ProxyHandlerInner {
471    fn next_req_id(&self) -> u64 {
472        self.next_id.fetch_add(1, Ordering::Relaxed)
473    }
474}
475
476#[derive(Clone)]
477struct ProxyHandler(Arc<ProxyHandlerInner>);
478
479impl ProxyHandler {
480    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
481        Self(Arc::new(ProxyHandlerInner {
482            cmd,
483            engine,
484            instance_pre,
485            next_id: AtomicU64::from(0),
486        }))
487    }
488}
489
490type Request = hyper::Request<hyper::body::Incoming>;
491
492async fn handle_request(
493    ProxyHandler(inner): ProxyHandler,
494    req: Request,
495) -> Result<hyper::Response<HyperOutgoingBody>> {
496    let (sender, receiver) = tokio::sync::oneshot::channel();
497
498    let req_id = inner.next_req_id();
499
500    log::info!(
501        "Request {req_id} handling {} to {}",
502        req.method(),
503        req.uri()
504    );
505
506    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
507
508    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
509    let out = store.data_mut().new_response_outparam(sender)?;
510    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
511
512    let task = tokio::task::spawn(async move {
513        if let Err(e) = proxy
514            .wasi_http_incoming_handler()
515            .call_handle(store, req, out)
516            .await
517        {
518            log::error!("[{req_id}] :: {:?}", e);
519            return Err(e);
520        }
521
522        Ok(())
523    });
524
525    match receiver.await {
526        Ok(Ok(resp)) => Ok(resp),
527        Ok(Err(e)) => Err(e.into()),
528        Err(_) => {
529            // An error in the receiver (`RecvError`) only indicates that the
530            // task exited before a response was sent (i.e., the sender was
531            // dropped); it does not describe the underlying cause of failure.
532            // Instead we retrieve and propagate the error from inside the task
533            // which should more clearly tell the user what went wrong. Note
534            // that we assume the task has already exited at this point so the
535            // `await` should resolve immediately.
536            let e = match task.await {
537                Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"),
538                Err(e) => e.into(),
539            };
540            return Err(e.context("guest never invoked `response-outparam::set` method"));
541        }
542    }
543}
544
545#[derive(Clone)]
546enum Output {
547    Stdout,
548    Stderr,
549}
550
551impl Output {
552    fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
553        use std::io::Write;
554
555        match self {
556            Output::Stdout => std::io::stdout().write_all(buf),
557            Output::Stderr => std::io::stderr().write_all(buf),
558        }
559        .map_err(|e| anyhow!(e))
560    }
561}
562
563#[derive(Clone)]
564struct LogStream {
565    prefix: String,
566    output: Output,
567    needs_prefix_on_next_write: bool,
568}
569
570impl LogStream {
571    fn new(prefix: String, output: Output) -> LogStream {
572        LogStream {
573            prefix,
574            output,
575            needs_prefix_on_next_write: true,
576        }
577    }
578}
579
580impl wasmtime_wasi::StdoutStream for LogStream {
581    fn stream(&self) -> Box<dyn wasmtime_wasi::OutputStream> {
582        Box::new(self.clone())
583    }
584
585    fn isatty(&self) -> bool {
586        use std::io::IsTerminal;
587
588        match &self.output {
589            Output::Stdout => std::io::stdout().is_terminal(),
590            Output::Stderr => std::io::stderr().is_terminal(),
591        }
592    }
593}
594
595impl wasmtime_wasi::OutputStream for LogStream {
596    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
597        let mut bytes = &bytes[..];
598
599        while !bytes.is_empty() {
600            if self.needs_prefix_on_next_write {
601                self.output
602                    .write_all(self.prefix.as_bytes())
603                    .map_err(StreamError::LastOperationFailed)?;
604                self.needs_prefix_on_next_write = false;
605            }
606            match bytes.iter().position(|b| *b == b'\n') {
607                Some(i) => {
608                    let (a, b) = bytes.split_at(i + 1);
609                    bytes = b;
610                    self.output
611                        .write_all(a)
612                        .map_err(StreamError::LastOperationFailed)?;
613                    self.needs_prefix_on_next_write = true;
614                }
615                None => {
616                    self.output
617                        .write_all(bytes)
618                        .map_err(StreamError::LastOperationFailed)?;
619                    break;
620                }
621            }
622        }
623
624        Ok(())
625    }
626
627    fn flush(&mut self) -> StreamResult<()> {
628        Ok(())
629    }
630
631    fn check_write(&mut self) -> StreamResult<usize> {
632        Ok(1024 * 1024)
633    }
634}
635
636#[async_trait::async_trait]
637impl wasmtime_wasi::Pollable for LogStream {
638    async fn ready(&mut self) {}
639}
640
641/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
642/// try to use it when we can. The main cost of the pooling allocator, however,
643/// is the virtual memory required to run it. Not all systems support the same
644/// amount of virtual memory, for example some aarch64 and riscv64 configuration
645/// only support 39 bits of virtual address space.
646///
647/// The pooling allocator, by default, will request 1000 linear memories each
648/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
649/// being about 42 bits of the address space. This exceeds the 39 bit limit of
650/// some systems, so there the pooling allocator will fail by default.
651///
652/// This function attempts to dynamically determine the hint for the pooling
653/// allocator. This returns `Some(true)` if the pooling allocator should be used
654/// by default, or `None` or an error otherwise.
655///
656/// The method for testing this is to allocate a 0-sized 64-bit linear memory
657/// with a maximum size that's N bits large where we force all memories to be
658/// static. This should attempt to acquire N bits of the virtual address space.
659/// If successful that should mean that the pooling allocator is OK to use, but
660/// if it fails then the pooling allocator is not used and the normal mmap-based
661/// implementation is used instead.
662fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
663    use wasmtime::{Config, Memory, MemoryType};
664    const BITS_TO_TEST: u32 = 42;
665    let mut config = Config::new();
666    config.wasm_memory64(true);
667    config.memory_reservation(1 << BITS_TO_TEST);
668    let engine = Engine::new(&config)?;
669    let mut store = Store::new(&engine, ());
670    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
671    // page size here from the maximum size.
672    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
673    if Memory::new(&mut store, ty).is_ok() {
674        Ok(Some(true))
675    } else {
676        Ok(None)
677    }
678}