wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use bytes::Bytes;
4use clap::Parser;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::BoxBody;
8use std::convert::Infallible;
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13use std::{
14    path::PathBuf,
15    sync::{
16        Arc, Mutex,
17        atomic::{AtomicBool, AtomicU64, Ordering},
18    },
19    time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker, ResourceTable};
24use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
25use wasmtime_wasi::p2::{StreamError, StreamResult};
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::bindings as p2;
28use wasmtime_wasi_http::io::TokioIo;
29use wasmtime_wasi_http::{
30    DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
31    WasiHttpView,
32};
33
34#[cfg(feature = "wasi-config")]
35use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
36#[cfg(feature = "wasi-keyvalue")]
37use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
38#[cfg(feature = "wasi-nn")]
39use wasmtime_wasi_nn::wit::WasiNnCtx;
40
41struct Host {
42    table: wasmtime::component::ResourceTable,
43    ctx: WasiCtx,
44    http: WasiHttpCtx,
45    http_outgoing_body_buffer_chunks: Option<usize>,
46    http_outgoing_body_chunk_size: Option<usize>,
47
48    #[cfg(feature = "component-model-async")]
49    p3_http: DefaultP3Ctx,
50
51    limits: StoreLimits,
52
53    #[cfg(feature = "wasi-nn")]
54    nn: Option<WasiNnCtx>,
55
56    #[cfg(feature = "wasi-config")]
57    wasi_config: Option<WasiConfigVariables>,
58
59    #[cfg(feature = "wasi-keyvalue")]
60    wasi_keyvalue: Option<WasiKeyValueCtx>,
61
62    #[cfg(feature = "profiling")]
63    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
64}
65
66impl WasiView for Host {
67    fn ctx(&mut self) -> WasiCtxView<'_> {
68        WasiCtxView {
69            ctx: &mut self.ctx,
70            table: &mut self.table,
71        }
72    }
73}
74
75impl WasiHttpView for Host {
76    fn ctx(&mut self) -> &mut WasiHttpCtx {
77        &mut self.http
78    }
79    fn table(&mut self) -> &mut ResourceTable {
80        &mut self.table
81    }
82
83    fn outgoing_body_buffer_chunks(&mut self) -> usize {
84        self.http_outgoing_body_buffer_chunks
85            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
86    }
87
88    fn outgoing_body_chunk_size(&mut self) -> usize {
89        self.http_outgoing_body_chunk_size
90            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
91    }
92}
93
94#[cfg(feature = "component-model-async")]
95struct DefaultP3Ctx;
96#[cfg(feature = "component-model-async")]
97impl wasmtime_wasi_http::p3::WasiHttpCtx for DefaultP3Ctx {}
98
99#[cfg(feature = "component-model-async")]
100impl wasmtime_wasi_http::p3::WasiHttpView for Host {
101    fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
102        wasmtime_wasi_http::p3::WasiHttpCtxView {
103            table: &mut self.table,
104            ctx: &mut self.p3_http,
105        }
106    }
107}
108
109const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
110    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
111    8080,
112);
113
114/// Runs a WebAssembly module
115#[derive(Parser)]
116pub struct ServeCommand {
117    #[command(flatten)]
118    run: RunCommon,
119
120    /// Socket address for the web server to bind to.
121    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
122    addr: SocketAddr,
123
124    /// Socket address where, when connected to, will initiate a graceful
125    /// shutdown.
126    ///
127    /// Note that graceful shutdown is also supported on ctrl-c.
128    #[arg(long, value_name = "SOCKADDR")]
129    shutdown_addr: Option<SocketAddr>,
130
131    /// Disable log prefixes of wasi-http handlers.
132    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
133    #[arg(long)]
134    no_logging_prefix: bool,
135
136    /// The WebAssembly component to run.
137    #[arg(value_name = "WASM", required = true)]
138    component: PathBuf,
139}
140
141impl ServeCommand {
142    /// Start a server to run the given wasi-http proxy component
143    pub fn execute(mut self) -> Result<()> {
144        self.run.common.init_logging()?;
145
146        // We force cli errors before starting to listen for connections so then
147        // we don't accidentally delay them to the first request.
148
149        if self.run.common.wasi.nn == Some(true) {
150            #[cfg(not(feature = "wasi-nn"))]
151            {
152                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
153            }
154        }
155
156        if self.run.common.wasi.threads == Some(true) {
157            bail!("wasi-threads does not support components yet")
158        }
159
160        // The serve command requires both wasi-http and the component model, so
161        // we enable those by default here.
162        if self.run.common.wasi.http.replace(true) == Some(false) {
163            bail!("wasi-http is required for the serve command, and must not be disabled");
164        }
165        if self.run.common.wasm.component_model.replace(true) == Some(false) {
166            bail!("components are required for the serve command, and must not be disabled");
167        }
168
169        let runtime = tokio::runtime::Builder::new_multi_thread()
170            .enable_time()
171            .enable_io()
172            .build()?;
173
174        runtime.block_on(self.serve())?;
175
176        Ok(())
177    }
178
179    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
180        let mut builder = WasiCtxBuilder::new();
181        self.run.configure_wasip2(&mut builder)?;
182
183        builder.env("REQUEST_ID", req_id.to_string());
184
185        let stdout_prefix: String;
186        let stderr_prefix: String;
187        if self.no_logging_prefix {
188            stdout_prefix = "".to_string();
189            stderr_prefix = "".to_string();
190        } else {
191            stdout_prefix = format!("stdout [{req_id}] :: ");
192            stderr_prefix = format!("stderr [{req_id}] :: ");
193        }
194        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
195        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
196
197        let mut host = Host {
198            table: wasmtime::component::ResourceTable::new(),
199            ctx: builder.build(),
200            http: WasiHttpCtx::new(),
201            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
202            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
203
204            limits: StoreLimits::default(),
205
206            #[cfg(feature = "wasi-nn")]
207            nn: None,
208            #[cfg(feature = "wasi-config")]
209            wasi_config: None,
210            #[cfg(feature = "wasi-keyvalue")]
211            wasi_keyvalue: None,
212            #[cfg(feature = "profiling")]
213            guest_profiler: None,
214            #[cfg(feature = "component-model-async")]
215            p3_http: DefaultP3Ctx,
216        };
217
218        if self.run.common.wasi.nn == Some(true) {
219            #[cfg(feature = "wasi-nn")]
220            {
221                let graphs = self
222                    .run
223                    .common
224                    .wasi
225                    .nn_graph
226                    .iter()
227                    .map(|g| (g.format.clone(), g.dir.clone()))
228                    .collect::<Vec<_>>();
229                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
230                host.nn.replace(WasiNnCtx::new(backends, registry));
231            }
232        }
233
234        if self.run.common.wasi.config == Some(true) {
235            #[cfg(feature = "wasi-config")]
236            {
237                let vars = WasiConfigVariables::from_iter(
238                    self.run
239                        .common
240                        .wasi
241                        .config_var
242                        .iter()
243                        .map(|v| (v.key.clone(), v.value.clone())),
244                );
245                host.wasi_config.replace(vars);
246            }
247        }
248
249        if self.run.common.wasi.keyvalue == Some(true) {
250            #[cfg(feature = "wasi-keyvalue")]
251            {
252                let ctx = WasiKeyValueCtxBuilder::new()
253                    .in_memory_data(
254                        self.run
255                            .common
256                            .wasi
257                            .keyvalue_in_memory_data
258                            .iter()
259                            .map(|v| (v.key.clone(), v.value.clone())),
260                    )
261                    .build();
262                host.wasi_keyvalue.replace(ctx);
263            }
264        }
265
266        let mut store = Store::new(engine, host);
267
268        store.data_mut().limits = self.run.store_limits();
269        store.limiter(|t| &mut t.limits);
270
271        // If fuel has been configured, we want to add the configured
272        // fuel amount to this store.
273        if let Some(fuel) = self.run.common.wasm.fuel {
274            store.set_fuel(fuel)?;
275        }
276
277        Ok(store)
278    }
279
280    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
281        self.run.validate_p3_option()?;
282        let cli = self.run.validate_cli_enabled()?;
283
284        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
285        // to serve as a signal to enable all WASI interfaces instead of just
286        // those in the `proxy` world. If `-Scli` is present then add all
287        // `command` APIs and then additionally add in the required HTTP APIs.
288        //
289        // If `-Scli` isn't passed then use the `add_to_linker_async`
290        // bindings which adds just those interfaces that the proxy interface
291        // uses.
292        if cli == Some(true) {
293            self.run.add_wasmtime_wasi_to_linker(linker)?;
294            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
295            #[cfg(feature = "component-model-async")]
296            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
297                wasmtime_wasi_http::p3::add_to_linker(linker)?;
298            }
299        } else {
300            wasmtime_wasi_http::add_to_linker_async(linker)?;
301            #[cfg(feature = "component-model-async")]
302            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
303                wasmtime_wasi_http::p3::add_to_linker(linker)?;
304                wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
305                wasmtime_wasi::p3::random::add_to_linker(linker)?;
306                wasmtime_wasi::p3::cli::add_to_linker(linker)?;
307            }
308        }
309
310        if self.run.common.wasi.nn == Some(true) {
311            #[cfg(not(feature = "wasi-nn"))]
312            {
313                bail!("support for wasi-nn was disabled at compile time");
314            }
315            #[cfg(feature = "wasi-nn")]
316            {
317                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
318                    let ctx = h.nn.as_mut().unwrap();
319                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
320                })?;
321            }
322        }
323
324        if self.run.common.wasi.config == Some(true) {
325            #[cfg(not(feature = "wasi-config"))]
326            {
327                bail!("support for wasi-config was disabled at compile time");
328            }
329            #[cfg(feature = "wasi-config")]
330            {
331                wasmtime_wasi_config::add_to_linker(linker, |h| {
332                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
333                })?;
334            }
335        }
336
337        if self.run.common.wasi.keyvalue == Some(true) {
338            #[cfg(not(feature = "wasi-keyvalue"))]
339            {
340                bail!("support for wasi-keyvalue was disabled at compile time");
341            }
342            #[cfg(feature = "wasi-keyvalue")]
343            {
344                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
345                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
346                })?;
347            }
348        }
349
350        if self.run.common.wasi.threads == Some(true) {
351            bail!("support for wasi-threads is not available with components");
352        }
353
354        if self.run.common.wasi.http == Some(false) {
355            bail!("support for wasi-http must be enabled for `serve` subcommand");
356        }
357
358        Ok(())
359    }
360
361    async fn serve(mut self) -> Result<()> {
362        use hyper::server::conn::http1;
363
364        let mut config = self
365            .run
366            .common
367            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
368        config.wasm_component_model(true);
369        config.async_support(true);
370
371        if self.run.common.wasm.timeout.is_some() {
372            config.epoch_interruption(true);
373        }
374
375        match self.run.profile {
376            Some(Profile::Native(s)) => {
377                config.profiler(s);
378            }
379            Some(Profile::Guest { .. }) => {
380                config.epoch_interruption(true);
381            }
382            None => {}
383        }
384
385        let engine = Engine::new(&config)?;
386        let mut linker = Linker::new(&engine);
387
388        self.add_to_linker(&mut linker)?;
389
390        let component = match self.run.load_module(&engine, &self.component)? {
391            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
392            RunTarget::Component(c) => c,
393        };
394
395        let instance = linker.instantiate_pre(&component)?;
396        #[cfg(feature = "component-model-async")]
397        let instance = match wasmtime_wasi_http::p3::bindings::ProxyIndices::new(&instance) {
398            Ok(indices) => ProxyPre::P3(indices, instance),
399            Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
400        };
401        #[cfg(not(feature = "component-model-async"))]
402        let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
403
404        // Spawn background task(s) waiting for graceful shutdown signals. This
405        // always listens for ctrl-c but additionally can listen for a TCP
406        // connection to the specified address.
407        let shutdown = Arc::new(GracefulShutdown::default());
408        tokio::task::spawn({
409            let shutdown = shutdown.clone();
410            async move {
411                tokio::signal::ctrl_c().await.unwrap();
412                shutdown.requested.notify_one();
413            }
414        });
415        if let Some(addr) = self.shutdown_addr {
416            let listener = tokio::net::TcpListener::bind(addr).await?;
417            eprintln!(
418                "Listening for shutdown on tcp://{}/",
419                listener.local_addr()?
420            );
421            let shutdown = shutdown.clone();
422            tokio::task::spawn(async move {
423                let _ = listener.accept().await;
424                shutdown.requested.notify_one();
425            });
426        }
427
428        let socket = match &self.addr {
429            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
430            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
431        };
432        // Conditionally enable `SO_REUSEADDR` depending on the current
433        // platform. On Unix we want this to be able to rebind an address in
434        // the `TIME_WAIT` state which can happen then a server is killed with
435        // active TCP connections and then restarted. On Windows though if
436        // `SO_REUSEADDR` is specified then it enables multiple applications to
437        // bind the port at the same time which is not something we want. Hence
438        // this is conditionally set based on the platform (and deviates from
439        // Tokio's default from always-on).
440        socket.set_reuseaddr(!cfg!(windows))?;
441        socket.bind(self.addr)?;
442        let listener = socket.listen(100)?;
443
444        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
445
446        log::info!("Listening on {}", self.addr);
447
448        let handler = ProxyHandler::new(self, engine, instance);
449
450        loop {
451            // Wait for a socket, but also "race" against shutdown to break out
452            // of this loop. Once the graceful shutdown signal is received then
453            // this loop exits immediately.
454            let (stream, _) = tokio::select! {
455                _ = shutdown.requested.notified() => break,
456                v = listener.accept() => v?,
457            };
458            let comp = component.clone();
459            let stream = TokioIo::new(stream);
460            let h = handler.clone();
461            let shutdown_guard = shutdown.clone().increment();
462            tokio::task::spawn(async move {
463                if let Err(e) = http1::Builder::new()
464                    .keep_alive(true)
465                    .serve_connection(
466                        stream,
467                        hyper::service::service_fn(move |req| {
468                            let comp = comp.clone();
469                            let h = h.clone();
470                            async move {
471                                use http_body_util::{BodyExt, Full};
472                                match handle_request(h, req, comp).await {
473                                    Ok(r) => Ok::<_, Infallible>(r),
474                                    Err(e) => {
475                                        eprintln!("error: {e:?}");
476                                        let error_html = "\
477<!doctype html>
478<html>
479<head>
480    <title>500 Internal Server Error</title>
481</head>
482<body>
483    <center>
484        <h1>500 Internal Server Error</h1>
485        <hr>
486        wasmtime
487    </center>
488</body>
489</html>";
490                                        Ok(Response::builder()
491                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
492                                            .header("Content-Type", "text/html; charset=UTF-8")
493                                            .body(
494                                                Full::new(bytes::Bytes::from(error_html))
495                                                    .map_err(|_| unreachable!())
496                                                    .boxed(),
497                                            )
498                                            .unwrap())
499                                    }
500                                }
501                            }
502                        }),
503                    )
504                    .await
505                {
506                    eprintln!("error: {e:?}");
507                }
508                drop(shutdown_guard);
509            });
510        }
511
512        // Upon exiting the loop we'll no longer process any more incoming
513        // connections but there may still be outstanding connections
514        // processing in child tasks. If there are wait for those to complete
515        // before shutting down completely. Also enable short-circuiting this
516        // wait with a second ctrl-c signal.
517        if shutdown.close() {
518            return Ok(());
519        }
520        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
521        tokio::select! {
522            _ = tokio::signal::ctrl_c() => {}
523            _ = shutdown.complete.notified() => {}
524        }
525
526        Ok(())
527    }
528}
529
530/// Helper structure to manage graceful shutdown int he accept loop above.
531#[derive(Default)]
532struct GracefulShutdown {
533    /// Async notification that shutdown has been requested.
534    requested: Notify,
535    /// Async notification that shutdown has completed, signaled when
536    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
537    complete: Notify,
538    /// Internal state related to what's in progress when shutdown is requested.
539    state: Mutex<GracefulShutdownState>,
540}
541
542#[derive(Default)]
543struct GracefulShutdownState {
544    active_tasks: u32,
545    notify_when_done: bool,
546}
547
548impl GracefulShutdown {
549    /// Increments the number of active tasks and returns a guard indicating
550    fn increment(self: Arc<Self>) -> impl Drop {
551        struct Guard(Arc<GracefulShutdown>);
552
553        let mut state = self.state.lock().unwrap();
554        assert!(!state.notify_when_done);
555        state.active_tasks += 1;
556        drop(state);
557
558        return Guard(self);
559
560        impl Drop for Guard {
561            fn drop(&mut self) {
562                let mut state = self.0.state.lock().unwrap();
563                state.active_tasks -= 1;
564                if state.notify_when_done && state.active_tasks == 0 {
565                    self.0.complete.notify_one();
566                }
567            }
568        }
569    }
570
571    /// Flags this state as done spawning tasks and returns whether there are no
572    /// more child tasks remaining.
573    fn close(&self) -> bool {
574        let mut state = self.state.lock().unwrap();
575        state.notify_when_done = true;
576        state.active_tasks == 0
577    }
578}
579
580/// When executing with a timeout enabled, this is how frequently epoch
581/// interrupts will be executed to check for timeouts. If guest profiling
582/// is enabled, the guest epoch period will be used.
583const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
584
585struct EpochThread {
586    shutdown: Arc<AtomicBool>,
587    handle: Option<std::thread::JoinHandle<()>>,
588}
589
590impl EpochThread {
591    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
592        let shutdown = Arc::new(AtomicBool::new(false));
593        let handle = {
594            let shutdown = Arc::clone(&shutdown);
595            let handle = std::thread::spawn(move || {
596                while !shutdown.load(Ordering::Relaxed) {
597                    std::thread::sleep(interval);
598                    engine.increment_epoch();
599                }
600            });
601            Some(handle)
602        };
603
604        EpochThread { shutdown, handle }
605    }
606}
607
608impl Drop for EpochThread {
609    fn drop(&mut self) {
610        if let Some(handle) = self.handle.take() {
611            self.shutdown.store(true, Ordering::Relaxed);
612            handle.join().unwrap();
613        }
614    }
615}
616
617type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
618
619fn setup_epoch_handler(
620    cmd: &ServeCommand,
621    store: &mut Store<Host>,
622    component: Component,
623) -> Result<(WriteProfile, Option<EpochThread>)> {
624    // Profiling Enabled
625    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
626        #[cfg(feature = "profiling")]
627        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
628        #[cfg(not(feature = "profiling"))]
629        {
630            let _ = (path, interval);
631            bail!("support for profiling disabled at compile time!");
632        }
633    }
634
635    // Profiling disabled but there's a global request timeout
636    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
637        let start = Instant::now();
638        store.epoch_deadline_callback(move |_store| {
639            if start.elapsed() > timeout {
640                bail!("Timeout expired");
641            }
642            Ok(UpdateDeadline::Continue(1))
643        });
644        store.set_epoch_deadline(1);
645        let engine = store.engine().clone();
646        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
647    } else {
648        None
649    };
650
651    Ok((Box::new(|_store| {}), epoch_thread))
652}
653
654#[cfg(feature = "profiling")]
655fn setup_guest_profiler(
656    cmd: &ServeCommand,
657    store: &mut Store<Host>,
658    path: String,
659    interval: Duration,
660    component: Component,
661) -> Result<(WriteProfile, Option<EpochThread>)> {
662    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
663
664    let module_name = "<main>";
665
666    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
667        module_name,
668        interval,
669        component,
670        std::iter::empty(),
671    )));
672
673    fn sample(
674        mut store: StoreContextMut<Host>,
675        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
676    ) {
677        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
678        f(
679            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
680            store.as_context(),
681        );
682        store.data_mut().guest_profiler = Some(profiler);
683    }
684
685    // Hostcall entry/exit, etc.
686    store.call_hook(|store, kind| {
687        sample(store, |profiler, store| profiler.call_hook(store, kind));
688        Ok(())
689    });
690
691    let start = Instant::now();
692    let timeout = cmd.run.common.wasm.timeout;
693    store.epoch_deadline_callback(move |store| {
694        sample(store, |profiler, store| {
695            profiler.sample(store, std::time::Duration::ZERO)
696        });
697
698        // Originally epoch counting was used here; this is problematic in
699        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
700        // when we are not expected to get sample hits.
701        if let Some(timeout) = timeout {
702            if start.elapsed() > timeout {
703                bail!("Timeout expired");
704            }
705        }
706
707        Ok(UpdateDeadline::Continue(1))
708    });
709
710    store.set_epoch_deadline(1);
711    let engine = store.engine().clone();
712    let epoch_thread = Some(EpochThread::spawn(interval, engine));
713
714    let write_profile = Box::new(move |store: &mut Store<Host>| {
715        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
716            .expect("profiling doesn't support threads yet");
717        if let Err(e) = std::fs::File::create(&path)
718            .map_err(anyhow::Error::new)
719            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
720        {
721            eprintln!("failed writing profile at {path}: {e:#}");
722        } else {
723            eprintln!();
724            eprintln!("Profile written to: {path}");
725            eprintln!("View this profile at https://profiler.firefox.com/.");
726        }
727    });
728
729    Ok((write_profile, epoch_thread))
730}
731
732struct ProxyHandlerInner {
733    cmd: ServeCommand,
734    engine: Engine,
735    instance_pre: ProxyPre,
736    next_id: AtomicU64,
737}
738
739enum ProxyPre {
740    P2(p2::ProxyPre<Host>),
741    #[cfg(feature = "component-model-async")]
742    P3(
743        wasmtime_wasi_http::p3::bindings::ProxyIndices,
744        wasmtime::component::InstancePre<Host>,
745    ),
746}
747
748impl ProxyPre {
749    async fn instantiate(&self, store: &mut Store<Host>) -> Result<Proxy> {
750        Ok(match self {
751            ProxyPre::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
752            #[cfg(feature = "component-model-async")]
753            ProxyPre::P3(indices, pre) => {
754                let instance = pre.instantiate_async(&mut *store).await?;
755                let proxy = indices.load(&mut *store, &instance)?;
756                Proxy::P3(proxy, instance)
757            }
758        })
759    }
760}
761
762enum Proxy {
763    P2(p2::Proxy),
764    #[cfg(feature = "component-model-async")]
765    P3(
766        wasmtime_wasi_http::p3::bindings::Proxy,
767        wasmtime::component::Instance,
768    ),
769}
770
771impl ProxyHandlerInner {
772    fn next_req_id(&self) -> u64 {
773        self.next_id.fetch_add(1, Ordering::Relaxed)
774    }
775}
776
777#[derive(Clone)]
778struct ProxyHandler(Arc<ProxyHandlerInner>);
779
780impl ProxyHandler {
781    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre) -> Self {
782        Self(Arc::new(ProxyHandlerInner {
783            cmd,
784            engine,
785            instance_pre,
786            next_id: AtomicU64::from(0),
787        }))
788    }
789}
790
791type Request = hyper::Request<hyper::body::Incoming>;
792
793async fn handle_request(
794    ProxyHandler(inner): ProxyHandler,
795    req: Request,
796    component: Component,
797) -> Result<hyper::Response<BoxBody<Bytes, anyhow::Error>>> {
798    let (sender, receiver) = tokio::sync::oneshot::channel();
799
800    let req_id = inner.next_req_id();
801
802    log::info!(
803        "Request {req_id} handling {} to {}",
804        req.method(),
805        req.uri()
806    );
807
808    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
809
810    let (write_profile, epoch_thread) =
811        setup_epoch_handler(&inner.cmd, &mut store, component.clone())?;
812
813    match inner.instance_pre.instantiate(&mut store).await? {
814        Proxy::P2(proxy) => {
815            let req = store
816                .data_mut()
817                .new_incoming_request(p2::http::types::Scheme::Http, req)?;
818            let out = store.data_mut().new_response_outparam(sender)?;
819            let task = tokio::task::spawn(async move {
820                if let Err(e) = proxy
821                    .wasi_http_incoming_handler()
822                    .call_handle(&mut store, req, out)
823                    .await
824                {
825                    log::error!("[{req_id}] :: {e:?}");
826                    return Err(e);
827                }
828
829                write_profile(&mut store);
830                drop(epoch_thread);
831
832                Ok(())
833            });
834
835            let result = match receiver.await {
836                Ok(Ok(resp)) => resp,
837                Ok(Err(e)) => bail!(e),
838                Err(_) => {
839                    // An error in the receiver (`RecvError`) only indicates that the
840                    // task exited before a response was sent (i.e., the sender was
841                    // dropped); it does not describe the underlying cause of failure.
842                    // Instead we retrieve and propagate the error from inside the task
843                    // which should more clearly tell the user what went wrong. Note
844                    // that we assume the task has already exited at this point so the
845                    // `await` should resolve immediately.
846                    let e = match task.await {
847                        Ok(Ok(())) => {
848                            bail!("guest never invoked `response-outparam::set` method")
849                        }
850                        Ok(Err(e)) => e,
851                        Err(e) => e.into(),
852                    };
853                    bail!(e.context("guest never invoked `response-outparam::set` method"))
854                }
855            };
856
857            Ok(result.map(|body| body.map_err(|e| e.into()).boxed()))
858        }
859        #[cfg(feature = "component-model-async")]
860        Proxy::P3(proxy, instance) => {
861            use wasmtime_wasi_http::p3::bindings::http::types::{ErrorCode, Request};
862
863            let (tx, rx) = tokio::sync::oneshot::channel();
864
865            tokio::task::spawn(async move {
866                let guest_result = instance
867                    .run_concurrent(&mut store, async move |store| {
868                        let (req, body) = req.into_parts();
869                        let body = body.map_err(ErrorCode::from_hyper_request_error);
870                        let req = http::Request::from_parts(req, body);
871                        let (request, request_io_result) = Request::from_http(req);
872                        let (res, task) = proxy.handle(store, request).await??;
873                        let res =
874                            store.with(|mut store| res.into_http(&mut store, request_io_result))?;
875
876                        _ = tx.send(res);
877
878                        task.block(store).await;
879                        anyhow::Ok(())
880                    })
881                    .await?;
882                if let Err(e) = guest_result {
883                    log::error!("[{req_id}] :: {e:?}");
884                    return Err(e);
885                }
886
887                write_profile(&mut store);
888                drop(epoch_thread);
889
890                anyhow::Ok(())
891            });
892            Ok(rx.await?.map(|body| body.map_err(|err| err.into()).boxed()))
893        }
894    }
895}
896
897#[derive(Clone)]
898enum Output {
899    Stdout,
900    Stderr,
901}
902
903impl Output {
904    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
905        use std::io::Write;
906
907        match self {
908            Output::Stdout => std::io::stdout().write_all(buf),
909            Output::Stderr => std::io::stderr().write_all(buf),
910        }
911    }
912}
913
914#[derive(Clone)]
915struct LogStream {
916    output: Output,
917    state: Arc<LogStreamState>,
918}
919
920struct LogStreamState {
921    prefix: String,
922    needs_prefix_on_next_write: AtomicBool,
923}
924
925impl LogStream {
926    fn new(prefix: String, output: Output) -> LogStream {
927        LogStream {
928            output,
929            state: Arc::new(LogStreamState {
930                prefix,
931                needs_prefix_on_next_write: AtomicBool::new(true),
932            }),
933        }
934    }
935
936    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
937        while !bytes.is_empty() {
938            if self
939                .state
940                .needs_prefix_on_next_write
941                .load(Ordering::Relaxed)
942            {
943                self.output.write_all(self.state.prefix.as_bytes())?;
944                self.state
945                    .needs_prefix_on_next_write
946                    .store(false, Ordering::Relaxed);
947            }
948            match bytes.iter().position(|b| *b == b'\n') {
949                Some(i) => {
950                    let (a, b) = bytes.split_at(i + 1);
951                    bytes = b;
952                    self.output.write_all(a)?;
953                    self.state
954                        .needs_prefix_on_next_write
955                        .store(true, Ordering::Relaxed);
956                }
957                None => {
958                    self.output.write_all(bytes)?;
959                    break;
960                }
961            }
962        }
963
964        Ok(())
965    }
966}
967
968impl wasmtime_wasi::cli::StdoutStream for LogStream {
969    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
970        Box::new(self.clone())
971    }
972    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
973        Box::new(self.clone())
974    }
975}
976
977impl wasmtime_wasi::cli::IsTerminal for LogStream {
978    fn is_terminal(&self) -> bool {
979        match &self.output {
980            Output::Stdout => std::io::stdout().is_terminal(),
981            Output::Stderr => std::io::stderr().is_terminal(),
982        }
983    }
984}
985
986impl wasmtime_wasi::p2::OutputStream for LogStream {
987    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
988        self.write_all(&bytes)
989            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
990        Ok(())
991    }
992
993    fn flush(&mut self) -> StreamResult<()> {
994        Ok(())
995    }
996
997    fn check_write(&mut self) -> StreamResult<usize> {
998        Ok(1024 * 1024)
999    }
1000}
1001
1002#[async_trait::async_trait]
1003impl wasmtime_wasi::p2::Pollable for LogStream {
1004    async fn ready(&mut self) {}
1005}
1006
1007impl AsyncWrite for LogStream {
1008    fn poll_write(
1009        mut self: Pin<&mut Self>,
1010        _cx: &mut Context<'_>,
1011        buf: &[u8],
1012    ) -> Poll<io::Result<usize>> {
1013        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1014    }
1015    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1016        Poll::Ready(Ok(()))
1017    }
1018    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1019        Poll::Ready(Ok(()))
1020    }
1021}
1022
1023/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1024/// try to use it when we can. The main cost of the pooling allocator, however,
1025/// is the virtual memory required to run it. Not all systems support the same
1026/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1027/// only support 39 bits of virtual address space.
1028///
1029/// The pooling allocator, by default, will request 1000 linear memories each
1030/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1031/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1032/// some systems, so there the pooling allocator will fail by default.
1033///
1034/// This function attempts to dynamically determine the hint for the pooling
1035/// allocator. This returns `Some(true)` if the pooling allocator should be used
1036/// by default, or `None` or an error otherwise.
1037///
1038/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1039/// with a maximum size that's N bits large where we force all memories to be
1040/// static. This should attempt to acquire N bits of the virtual address space.
1041/// If successful that should mean that the pooling allocator is OK to use, but
1042/// if it fails then the pooling allocator is not used and the normal mmap-based
1043/// implementation is used instead.
1044fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1045    use wasmtime::{Config, Memory, MemoryType};
1046    const BITS_TO_TEST: u32 = 42;
1047    let mut config = Config::new();
1048    config.wasm_memory64(true);
1049    config.memory_reservation(1 << BITS_TO_TEST);
1050    let engine = Engine::new(&config)?;
1051    let mut store = Store::new(&engine, ());
1052    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1053    // page size here from the maximum size.
1054    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1055    if Memory::new(&mut store, ty).is_ok() {
1056        Ok(Some(true))
1057    } else {
1058        Ok(None)
1059    }
1060}