wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use bytes::Bytes;
4use clap::Parser;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::BoxBody;
8use std::convert::Infallible;
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13use std::{
14    path::PathBuf,
15    sync::{
16        Arc, Mutex,
17        atomic::{AtomicBool, AtomicU64, Ordering},
18    },
19    time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker, ResourceTable};
24use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
25use wasmtime_wasi::p2::{StreamError, StreamResult};
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::bindings as p2;
28use wasmtime_wasi_http::io::TokioIo;
29use wasmtime_wasi_http::{
30    DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
31    WasiHttpView,
32};
33
34#[cfg(feature = "wasi-config")]
35use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
36#[cfg(feature = "wasi-keyvalue")]
37use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
38#[cfg(feature = "wasi-nn")]
39use wasmtime_wasi_nn::wit::WasiNnCtx;
40
41struct Host {
42    table: wasmtime::component::ResourceTable,
43    ctx: WasiCtx,
44    http: WasiHttpCtx,
45    http_outgoing_body_buffer_chunks: Option<usize>,
46    http_outgoing_body_chunk_size: Option<usize>,
47
48    #[cfg(feature = "component-model-async")]
49    p3_http: crate::common::DefaultP3Ctx,
50
51    limits: StoreLimits,
52
53    #[cfg(feature = "wasi-nn")]
54    nn: Option<WasiNnCtx>,
55
56    #[cfg(feature = "wasi-config")]
57    wasi_config: Option<WasiConfigVariables>,
58
59    #[cfg(feature = "wasi-keyvalue")]
60    wasi_keyvalue: Option<WasiKeyValueCtx>,
61
62    #[cfg(feature = "profiling")]
63    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
64}
65
66impl WasiView for Host {
67    fn ctx(&mut self) -> WasiCtxView<'_> {
68        WasiCtxView {
69            ctx: &mut self.ctx,
70            table: &mut self.table,
71        }
72    }
73}
74
75impl WasiHttpView for Host {
76    fn ctx(&mut self) -> &mut WasiHttpCtx {
77        &mut self.http
78    }
79    fn table(&mut self) -> &mut ResourceTable {
80        &mut self.table
81    }
82
83    fn outgoing_body_buffer_chunks(&mut self) -> usize {
84        self.http_outgoing_body_buffer_chunks
85            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
86    }
87
88    fn outgoing_body_chunk_size(&mut self) -> usize {
89        self.http_outgoing_body_chunk_size
90            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
91    }
92}
93
94#[cfg(feature = "component-model-async")]
95impl wasmtime_wasi_http::p3::WasiHttpView for Host {
96    fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
97        wasmtime_wasi_http::p3::WasiHttpCtxView {
98            table: &mut self.table,
99            ctx: &mut self.p3_http,
100        }
101    }
102}
103
104const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
105    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
106    8080,
107);
108
109/// Runs a WebAssembly module
110#[derive(Parser)]
111pub struct ServeCommand {
112    #[command(flatten)]
113    run: RunCommon,
114
115    /// Socket address for the web server to bind to.
116    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
117    addr: SocketAddr,
118
119    /// Socket address where, when connected to, will initiate a graceful
120    /// shutdown.
121    ///
122    /// Note that graceful shutdown is also supported on ctrl-c.
123    #[arg(long, value_name = "SOCKADDR")]
124    shutdown_addr: Option<SocketAddr>,
125
126    /// Disable log prefixes of wasi-http handlers.
127    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
128    #[arg(long)]
129    no_logging_prefix: bool,
130
131    /// The WebAssembly component to run.
132    #[arg(value_name = "WASM", required = true)]
133    component: PathBuf,
134}
135
136impl ServeCommand {
137    /// Start a server to run the given wasi-http proxy component
138    pub fn execute(mut self) -> Result<()> {
139        self.run.common.init_logging()?;
140
141        // We force cli errors before starting to listen for connections so then
142        // we don't accidentally delay them to the first request.
143
144        if self.run.common.wasi.nn == Some(true) {
145            #[cfg(not(feature = "wasi-nn"))]
146            {
147                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
148            }
149        }
150
151        if self.run.common.wasi.threads == Some(true) {
152            bail!("wasi-threads does not support components yet")
153        }
154
155        // The serve command requires both wasi-http and the component model, so
156        // we enable those by default here.
157        if self.run.common.wasi.http.replace(true) == Some(false) {
158            bail!("wasi-http is required for the serve command, and must not be disabled");
159        }
160        if self.run.common.wasm.component_model.replace(true) == Some(false) {
161            bail!("components are required for the serve command, and must not be disabled");
162        }
163
164        let runtime = tokio::runtime::Builder::new_multi_thread()
165            .enable_time()
166            .enable_io()
167            .build()?;
168
169        runtime.block_on(self.serve())?;
170
171        Ok(())
172    }
173
174    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
175        let mut builder = WasiCtxBuilder::new();
176        self.run.configure_wasip2(&mut builder)?;
177
178        builder.env("REQUEST_ID", req_id.to_string());
179
180        let stdout_prefix: String;
181        let stderr_prefix: String;
182        if self.no_logging_prefix {
183            stdout_prefix = "".to_string();
184            stderr_prefix = "".to_string();
185        } else {
186            stdout_prefix = format!("stdout [{req_id}] :: ");
187            stderr_prefix = format!("stderr [{req_id}] :: ");
188        }
189        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
190        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
191
192        let mut host = Host {
193            table: wasmtime::component::ResourceTable::new(),
194            ctx: builder.build(),
195            http: WasiHttpCtx::new(),
196            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
197            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
198
199            limits: StoreLimits::default(),
200
201            #[cfg(feature = "wasi-nn")]
202            nn: None,
203            #[cfg(feature = "wasi-config")]
204            wasi_config: None,
205            #[cfg(feature = "wasi-keyvalue")]
206            wasi_keyvalue: None,
207            #[cfg(feature = "profiling")]
208            guest_profiler: None,
209            #[cfg(feature = "component-model-async")]
210            p3_http: crate::common::DefaultP3Ctx,
211        };
212
213        if self.run.common.wasi.nn == Some(true) {
214            #[cfg(feature = "wasi-nn")]
215            {
216                let graphs = self
217                    .run
218                    .common
219                    .wasi
220                    .nn_graph
221                    .iter()
222                    .map(|g| (g.format.clone(), g.dir.clone()))
223                    .collect::<Vec<_>>();
224                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
225                host.nn.replace(WasiNnCtx::new(backends, registry));
226            }
227        }
228
229        if self.run.common.wasi.config == Some(true) {
230            #[cfg(feature = "wasi-config")]
231            {
232                let vars = WasiConfigVariables::from_iter(
233                    self.run
234                        .common
235                        .wasi
236                        .config_var
237                        .iter()
238                        .map(|v| (v.key.clone(), v.value.clone())),
239                );
240                host.wasi_config.replace(vars);
241            }
242        }
243
244        if self.run.common.wasi.keyvalue == Some(true) {
245            #[cfg(feature = "wasi-keyvalue")]
246            {
247                let ctx = WasiKeyValueCtxBuilder::new()
248                    .in_memory_data(
249                        self.run
250                            .common
251                            .wasi
252                            .keyvalue_in_memory_data
253                            .iter()
254                            .map(|v| (v.key.clone(), v.value.clone())),
255                    )
256                    .build();
257                host.wasi_keyvalue.replace(ctx);
258            }
259        }
260
261        let mut store = Store::new(engine, host);
262
263        store.data_mut().limits = self.run.store_limits();
264        store.limiter(|t| &mut t.limits);
265
266        // If fuel has been configured, we want to add the configured
267        // fuel amount to this store.
268        if let Some(fuel) = self.run.common.wasm.fuel {
269            store.set_fuel(fuel)?;
270        }
271
272        Ok(store)
273    }
274
275    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
276        self.run.validate_p3_option()?;
277        let cli = self.run.validate_cli_enabled()?;
278
279        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
280        // to serve as a signal to enable all WASI interfaces instead of just
281        // those in the `proxy` world. If `-Scli` is present then add all
282        // `command` APIs and then additionally add in the required HTTP APIs.
283        //
284        // If `-Scli` isn't passed then use the `add_to_linker_async`
285        // bindings which adds just those interfaces that the proxy interface
286        // uses.
287        if cli == Some(true) {
288            self.run.add_wasmtime_wasi_to_linker(linker)?;
289            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
290            #[cfg(feature = "component-model-async")]
291            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
292                wasmtime_wasi_http::p3::add_to_linker(linker)?;
293            }
294        } else {
295            wasmtime_wasi_http::add_to_linker_async(linker)?;
296            #[cfg(feature = "component-model-async")]
297            if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
298                wasmtime_wasi_http::p3::add_to_linker(linker)?;
299                wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
300                wasmtime_wasi::p3::random::add_to_linker(linker)?;
301                wasmtime_wasi::p3::cli::add_to_linker(linker)?;
302            }
303        }
304
305        if self.run.common.wasi.nn == Some(true) {
306            #[cfg(not(feature = "wasi-nn"))]
307            {
308                bail!("support for wasi-nn was disabled at compile time");
309            }
310            #[cfg(feature = "wasi-nn")]
311            {
312                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
313                    let ctx = h.nn.as_mut().unwrap();
314                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
315                })?;
316            }
317        }
318
319        if self.run.common.wasi.config == Some(true) {
320            #[cfg(not(feature = "wasi-config"))]
321            {
322                bail!("support for wasi-config was disabled at compile time");
323            }
324            #[cfg(feature = "wasi-config")]
325            {
326                wasmtime_wasi_config::add_to_linker(linker, |h| {
327                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
328                })?;
329            }
330        }
331
332        if self.run.common.wasi.keyvalue == Some(true) {
333            #[cfg(not(feature = "wasi-keyvalue"))]
334            {
335                bail!("support for wasi-keyvalue was disabled at compile time");
336            }
337            #[cfg(feature = "wasi-keyvalue")]
338            {
339                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
340                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
341                })?;
342            }
343        }
344
345        if self.run.common.wasi.threads == Some(true) {
346            bail!("support for wasi-threads is not available with components");
347        }
348
349        if self.run.common.wasi.http == Some(false) {
350            bail!("support for wasi-http must be enabled for `serve` subcommand");
351        }
352
353        Ok(())
354    }
355
356    async fn serve(mut self) -> Result<()> {
357        use hyper::server::conn::http1;
358
359        let mut config = self
360            .run
361            .common
362            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
363        config.wasm_component_model(true);
364        config.async_support(true);
365
366        if self.run.common.wasm.timeout.is_some() {
367            config.epoch_interruption(true);
368        }
369
370        match self.run.profile {
371            Some(Profile::Native(s)) => {
372                config.profiler(s);
373            }
374            Some(Profile::Guest { .. }) => {
375                config.epoch_interruption(true);
376            }
377            None => {}
378        }
379
380        let engine = Engine::new(&config)?;
381        let mut linker = Linker::new(&engine);
382
383        self.add_to_linker(&mut linker)?;
384
385        let component = match self.run.load_module(&engine, &self.component)? {
386            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
387            RunTarget::Component(c) => c,
388        };
389
390        let instance = linker.instantiate_pre(&component)?;
391        #[cfg(feature = "component-model-async")]
392        let instance = match wasmtime_wasi_http::p3::bindings::ProxyIndices::new(&instance) {
393            Ok(indices) => ProxyPre::P3(indices, instance),
394            Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
395        };
396        #[cfg(not(feature = "component-model-async"))]
397        let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
398
399        // Spawn background task(s) waiting for graceful shutdown signals. This
400        // always listens for ctrl-c but additionally can listen for a TCP
401        // connection to the specified address.
402        let shutdown = Arc::new(GracefulShutdown::default());
403        tokio::task::spawn({
404            let shutdown = shutdown.clone();
405            async move {
406                tokio::signal::ctrl_c().await.unwrap();
407                shutdown.requested.notify_one();
408            }
409        });
410        if let Some(addr) = self.shutdown_addr {
411            let listener = tokio::net::TcpListener::bind(addr).await?;
412            eprintln!(
413                "Listening for shutdown on tcp://{}/",
414                listener.local_addr()?
415            );
416            let shutdown = shutdown.clone();
417            tokio::task::spawn(async move {
418                let _ = listener.accept().await;
419                shutdown.requested.notify_one();
420            });
421        }
422
423        let socket = match &self.addr {
424            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
425            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
426        };
427        // Conditionally enable `SO_REUSEADDR` depending on the current
428        // platform. On Unix we want this to be able to rebind an address in
429        // the `TIME_WAIT` state which can happen then a server is killed with
430        // active TCP connections and then restarted. On Windows though if
431        // `SO_REUSEADDR` is specified then it enables multiple applications to
432        // bind the port at the same time which is not something we want. Hence
433        // this is conditionally set based on the platform (and deviates from
434        // Tokio's default from always-on).
435        socket.set_reuseaddr(!cfg!(windows))?;
436        socket.bind(self.addr)?;
437        let listener = socket.listen(100)?;
438
439        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
440
441        log::info!("Listening on {}", self.addr);
442
443        let handler = ProxyHandler::new(self, engine, instance);
444
445        loop {
446            // Wait for a socket, but also "race" against shutdown to break out
447            // of this loop. Once the graceful shutdown signal is received then
448            // this loop exits immediately.
449            let (stream, _) = tokio::select! {
450                _ = shutdown.requested.notified() => break,
451                v = listener.accept() => v?,
452            };
453            let comp = component.clone();
454            let stream = TokioIo::new(stream);
455            let h = handler.clone();
456            let shutdown_guard = shutdown.clone().increment();
457            tokio::task::spawn(async move {
458                if let Err(e) = http1::Builder::new()
459                    .keep_alive(true)
460                    .serve_connection(
461                        stream,
462                        hyper::service::service_fn(move |req| {
463                            let comp = comp.clone();
464                            let h = h.clone();
465                            async move {
466                                use http_body_util::{BodyExt, Full};
467                                match handle_request(h, req, comp).await {
468                                    Ok(r) => Ok::<_, Infallible>(r),
469                                    Err(e) => {
470                                        eprintln!("error: {e:?}");
471                                        let error_html = "\
472<!doctype html>
473<html>
474<head>
475    <title>500 Internal Server Error</title>
476</head>
477<body>
478    <center>
479        <h1>500 Internal Server Error</h1>
480        <hr>
481        wasmtime
482    </center>
483</body>
484</html>";
485                                        Ok(Response::builder()
486                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
487                                            .header("Content-Type", "text/html; charset=UTF-8")
488                                            .body(
489                                                Full::new(bytes::Bytes::from(error_html))
490                                                    .map_err(|_| unreachable!())
491                                                    .boxed(),
492                                            )
493                                            .unwrap())
494                                    }
495                                }
496                            }
497                        }),
498                    )
499                    .await
500                {
501                    eprintln!("error: {e:?}");
502                }
503                drop(shutdown_guard);
504            });
505        }
506
507        // Upon exiting the loop we'll no longer process any more incoming
508        // connections but there may still be outstanding connections
509        // processing in child tasks. If there are wait for those to complete
510        // before shutting down completely. Also enable short-circuiting this
511        // wait with a second ctrl-c signal.
512        if shutdown.close() {
513            return Ok(());
514        }
515        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
516        tokio::select! {
517            _ = tokio::signal::ctrl_c() => {}
518            _ = shutdown.complete.notified() => {}
519        }
520
521        Ok(())
522    }
523}
524
525/// Helper structure to manage graceful shutdown int he accept loop above.
526#[derive(Default)]
527struct GracefulShutdown {
528    /// Async notification that shutdown has been requested.
529    requested: Notify,
530    /// Async notification that shutdown has completed, signaled when
531    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
532    complete: Notify,
533    /// Internal state related to what's in progress when shutdown is requested.
534    state: Mutex<GracefulShutdownState>,
535}
536
537#[derive(Default)]
538struct GracefulShutdownState {
539    active_tasks: u32,
540    notify_when_done: bool,
541}
542
543impl GracefulShutdown {
544    /// Increments the number of active tasks and returns a guard indicating
545    fn increment(self: Arc<Self>) -> impl Drop {
546        struct Guard(Arc<GracefulShutdown>);
547
548        let mut state = self.state.lock().unwrap();
549        assert!(!state.notify_when_done);
550        state.active_tasks += 1;
551        drop(state);
552
553        return Guard(self);
554
555        impl Drop for Guard {
556            fn drop(&mut self) {
557                let mut state = self.0.state.lock().unwrap();
558                state.active_tasks -= 1;
559                if state.notify_when_done && state.active_tasks == 0 {
560                    self.0.complete.notify_one();
561                }
562            }
563        }
564    }
565
566    /// Flags this state as done spawning tasks and returns whether there are no
567    /// more child tasks remaining.
568    fn close(&self) -> bool {
569        let mut state = self.state.lock().unwrap();
570        state.notify_when_done = true;
571        state.active_tasks == 0
572    }
573}
574
575/// When executing with a timeout enabled, this is how frequently epoch
576/// interrupts will be executed to check for timeouts. If guest profiling
577/// is enabled, the guest epoch period will be used.
578const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
579
580struct EpochThread {
581    shutdown: Arc<AtomicBool>,
582    handle: Option<std::thread::JoinHandle<()>>,
583}
584
585impl EpochThread {
586    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
587        let shutdown = Arc::new(AtomicBool::new(false));
588        let handle = {
589            let shutdown = Arc::clone(&shutdown);
590            let handle = std::thread::spawn(move || {
591                while !shutdown.load(Ordering::Relaxed) {
592                    std::thread::sleep(interval);
593                    engine.increment_epoch();
594                }
595            });
596            Some(handle)
597        };
598
599        EpochThread { shutdown, handle }
600    }
601}
602
603impl Drop for EpochThread {
604    fn drop(&mut self) {
605        if let Some(handle) = self.handle.take() {
606            self.shutdown.store(true, Ordering::Relaxed);
607            handle.join().unwrap();
608        }
609    }
610}
611
612type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
613
614fn setup_epoch_handler(
615    cmd: &ServeCommand,
616    store: &mut Store<Host>,
617    component: Component,
618) -> Result<(WriteProfile, Option<EpochThread>)> {
619    // Profiling Enabled
620    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
621        #[cfg(feature = "profiling")]
622        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
623        #[cfg(not(feature = "profiling"))]
624        {
625            let _ = (path, interval);
626            bail!("support for profiling disabled at compile time!");
627        }
628    }
629
630    // Profiling disabled but there's a global request timeout
631    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
632        let start = Instant::now();
633        store.epoch_deadline_callback(move |_store| {
634            if start.elapsed() > timeout {
635                bail!("Timeout expired");
636            }
637            Ok(UpdateDeadline::Continue(1))
638        });
639        store.set_epoch_deadline(1);
640        let engine = store.engine().clone();
641        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
642    } else {
643        None
644    };
645
646    Ok((Box::new(|_store| {}), epoch_thread))
647}
648
649#[cfg(feature = "profiling")]
650fn setup_guest_profiler(
651    cmd: &ServeCommand,
652    store: &mut Store<Host>,
653    path: String,
654    interval: Duration,
655    component: Component,
656) -> Result<(WriteProfile, Option<EpochThread>)> {
657    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
658
659    let module_name = "<main>";
660
661    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
662        module_name,
663        interval,
664        component,
665        std::iter::empty(),
666    )));
667
668    fn sample(
669        mut store: StoreContextMut<Host>,
670        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
671    ) {
672        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
673        f(
674            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
675            store.as_context(),
676        );
677        store.data_mut().guest_profiler = Some(profiler);
678    }
679
680    // Hostcall entry/exit, etc.
681    store.call_hook(|store, kind| {
682        sample(store, |profiler, store| profiler.call_hook(store, kind));
683        Ok(())
684    });
685
686    let start = Instant::now();
687    let timeout = cmd.run.common.wasm.timeout;
688    store.epoch_deadline_callback(move |store| {
689        sample(store, |profiler, store| {
690            profiler.sample(store, std::time::Duration::ZERO)
691        });
692
693        // Originally epoch counting was used here; this is problematic in
694        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
695        // when we are not expected to get sample hits.
696        if let Some(timeout) = timeout {
697            if start.elapsed() > timeout {
698                bail!("Timeout expired");
699            }
700        }
701
702        Ok(UpdateDeadline::Continue(1))
703    });
704
705    store.set_epoch_deadline(1);
706    let engine = store.engine().clone();
707    let epoch_thread = Some(EpochThread::spawn(interval, engine));
708
709    let write_profile = Box::new(move |store: &mut Store<Host>| {
710        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
711            .expect("profiling doesn't support threads yet");
712        if let Err(e) = std::fs::File::create(&path)
713            .map_err(anyhow::Error::new)
714            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
715        {
716            eprintln!("failed writing profile at {path}: {e:#}");
717        } else {
718            eprintln!();
719            eprintln!("Profile written to: {path}");
720            eprintln!("View this profile at https://profiler.firefox.com/.");
721        }
722    });
723
724    Ok((write_profile, epoch_thread))
725}
726
727struct ProxyHandlerInner {
728    cmd: ServeCommand,
729    engine: Engine,
730    instance_pre: ProxyPre,
731    next_id: AtomicU64,
732}
733
734enum ProxyPre {
735    P2(p2::ProxyPre<Host>),
736    #[cfg(feature = "component-model-async")]
737    P3(
738        wasmtime_wasi_http::p3::bindings::ProxyIndices,
739        wasmtime::component::InstancePre<Host>,
740    ),
741}
742
743impl ProxyPre {
744    async fn instantiate(&self, store: &mut Store<Host>) -> Result<Proxy> {
745        Ok(match self {
746            ProxyPre::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
747            #[cfg(feature = "component-model-async")]
748            ProxyPre::P3(indices, pre) => {
749                let instance = pre.instantiate_async(&mut *store).await?;
750                let proxy = indices.load(&mut *store, &instance)?;
751                Proxy::P3(proxy, instance)
752            }
753        })
754    }
755}
756
757enum Proxy {
758    P2(p2::Proxy),
759    #[cfg(feature = "component-model-async")]
760    P3(
761        wasmtime_wasi_http::p3::bindings::Proxy,
762        wasmtime::component::Instance,
763    ),
764}
765
766impl ProxyHandlerInner {
767    fn next_req_id(&self) -> u64 {
768        self.next_id.fetch_add(1, Ordering::Relaxed)
769    }
770}
771
772#[derive(Clone)]
773struct ProxyHandler(Arc<ProxyHandlerInner>);
774
775impl ProxyHandler {
776    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre) -> Self {
777        Self(Arc::new(ProxyHandlerInner {
778            cmd,
779            engine,
780            instance_pre,
781            next_id: AtomicU64::from(0),
782        }))
783    }
784}
785
786type Request = hyper::Request<hyper::body::Incoming>;
787
788async fn handle_request(
789    ProxyHandler(inner): ProxyHandler,
790    req: Request,
791    component: Component,
792) -> Result<hyper::Response<BoxBody<Bytes, anyhow::Error>>> {
793    let (sender, receiver) = tokio::sync::oneshot::channel();
794
795    let req_id = inner.next_req_id();
796
797    log::info!(
798        "Request {req_id} handling {} to {}",
799        req.method(),
800        req.uri()
801    );
802
803    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
804
805    let (write_profile, epoch_thread) =
806        setup_epoch_handler(&inner.cmd, &mut store, component.clone())?;
807
808    match inner.instance_pre.instantiate(&mut store).await? {
809        Proxy::P2(proxy) => {
810            let req = store
811                .data_mut()
812                .new_incoming_request(p2::http::types::Scheme::Http, req)?;
813            let out = store.data_mut().new_response_outparam(sender)?;
814            let task = tokio::task::spawn(async move {
815                if let Err(e) = proxy
816                    .wasi_http_incoming_handler()
817                    .call_handle(&mut store, req, out)
818                    .await
819                {
820                    log::error!("[{req_id}] :: {e:?}");
821                    return Err(e);
822                }
823
824                write_profile(&mut store);
825                drop(epoch_thread);
826
827                Ok(())
828            });
829
830            let result = match receiver.await {
831                Ok(Ok(resp)) => resp,
832                Ok(Err(e)) => bail!(e),
833                Err(_) => {
834                    // An error in the receiver (`RecvError`) only indicates that the
835                    // task exited before a response was sent (i.e., the sender was
836                    // dropped); it does not describe the underlying cause of failure.
837                    // Instead we retrieve and propagate the error from inside the task
838                    // which should more clearly tell the user what went wrong. Note
839                    // that we assume the task has already exited at this point so the
840                    // `await` should resolve immediately.
841                    let e = match task.await {
842                        Ok(Ok(())) => {
843                            bail!("guest never invoked `response-outparam::set` method")
844                        }
845                        Ok(Err(e)) => e,
846                        Err(e) => e.into(),
847                    };
848                    bail!(e.context("guest never invoked `response-outparam::set` method"))
849                }
850            };
851
852            Ok(result.map(|body| body.map_err(|e| e.into()).boxed()))
853        }
854        #[cfg(feature = "component-model-async")]
855        Proxy::P3(proxy, instance) => {
856            use wasmtime_wasi_http::p3::bindings::http::types::{ErrorCode, Request};
857
858            let (tx, rx) = tokio::sync::oneshot::channel();
859
860            tokio::task::spawn(async move {
861                let guest_result = instance
862                    .run_concurrent(&mut store, async move |store| {
863                        let (req, body) = req.into_parts();
864                        let body = body.map_err(ErrorCode::from_hyper_request_error);
865                        let req = http::Request::from_parts(req, body);
866                        let (request, request_io_result) = Request::from_http(req);
867                        let (res, task) = proxy.handle(store, request).await??;
868                        let res =
869                            store.with(|mut store| res.into_http(&mut store, request_io_result))?;
870
871                        _ = tx.send(res);
872
873                        task.block(store).await;
874                        anyhow::Ok(())
875                    })
876                    .await?;
877                if let Err(e) = guest_result {
878                    log::error!("[{req_id}] :: {e:?}");
879                    return Err(e);
880                }
881
882                write_profile(&mut store);
883                drop(epoch_thread);
884
885                anyhow::Ok(())
886            });
887            Ok(rx.await?.map(|body| body.map_err(|err| err.into()).boxed()))
888        }
889    }
890}
891
892#[derive(Clone)]
893enum Output {
894    Stdout,
895    Stderr,
896}
897
898impl Output {
899    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
900        use std::io::Write;
901
902        match self {
903            Output::Stdout => std::io::stdout().write_all(buf),
904            Output::Stderr => std::io::stderr().write_all(buf),
905        }
906    }
907}
908
909#[derive(Clone)]
910struct LogStream {
911    output: Output,
912    state: Arc<LogStreamState>,
913}
914
915struct LogStreamState {
916    prefix: String,
917    needs_prefix_on_next_write: AtomicBool,
918}
919
920impl LogStream {
921    fn new(prefix: String, output: Output) -> LogStream {
922        LogStream {
923            output,
924            state: Arc::new(LogStreamState {
925                prefix,
926                needs_prefix_on_next_write: AtomicBool::new(true),
927            }),
928        }
929    }
930
931    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
932        while !bytes.is_empty() {
933            if self
934                .state
935                .needs_prefix_on_next_write
936                .load(Ordering::Relaxed)
937            {
938                self.output.write_all(self.state.prefix.as_bytes())?;
939                self.state
940                    .needs_prefix_on_next_write
941                    .store(false, Ordering::Relaxed);
942            }
943            match bytes.iter().position(|b| *b == b'\n') {
944                Some(i) => {
945                    let (a, b) = bytes.split_at(i + 1);
946                    bytes = b;
947                    self.output.write_all(a)?;
948                    self.state
949                        .needs_prefix_on_next_write
950                        .store(true, Ordering::Relaxed);
951                }
952                None => {
953                    self.output.write_all(bytes)?;
954                    break;
955                }
956            }
957        }
958
959        Ok(())
960    }
961}
962
963impl wasmtime_wasi::cli::StdoutStream for LogStream {
964    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
965        Box::new(self.clone())
966    }
967    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
968        Box::new(self.clone())
969    }
970}
971
972impl wasmtime_wasi::cli::IsTerminal for LogStream {
973    fn is_terminal(&self) -> bool {
974        match &self.output {
975            Output::Stdout => std::io::stdout().is_terminal(),
976            Output::Stderr => std::io::stderr().is_terminal(),
977        }
978    }
979}
980
981impl wasmtime_wasi::p2::OutputStream for LogStream {
982    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
983        self.write_all(&bytes)
984            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
985        Ok(())
986    }
987
988    fn flush(&mut self) -> StreamResult<()> {
989        Ok(())
990    }
991
992    fn check_write(&mut self) -> StreamResult<usize> {
993        Ok(1024 * 1024)
994    }
995}
996
997#[async_trait::async_trait]
998impl wasmtime_wasi::p2::Pollable for LogStream {
999    async fn ready(&mut self) {}
1000}
1001
1002impl AsyncWrite for LogStream {
1003    fn poll_write(
1004        mut self: Pin<&mut Self>,
1005        _cx: &mut Context<'_>,
1006        buf: &[u8],
1007    ) -> Poll<io::Result<usize>> {
1008        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1009    }
1010    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1011        Poll::Ready(Ok(()))
1012    }
1013    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1014        Poll::Ready(Ok(()))
1015    }
1016}
1017
1018/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1019/// try to use it when we can. The main cost of the pooling allocator, however,
1020/// is the virtual memory required to run it. Not all systems support the same
1021/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1022/// only support 39 bits of virtual address space.
1023///
1024/// The pooling allocator, by default, will request 1000 linear memories each
1025/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1026/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1027/// some systems, so there the pooling allocator will fail by default.
1028///
1029/// This function attempts to dynamically determine the hint for the pooling
1030/// allocator. This returns `Some(true)` if the pooling allocator should be used
1031/// by default, or `None` or an error otherwise.
1032///
1033/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1034/// with a maximum size that's N bits large where we force all memories to be
1035/// static. This should attempt to acquire N bits of the virtual address space.
1036/// If successful that should mean that the pooling allocator is OK to use, but
1037/// if it fails then the pooling allocator is not used and the normal mmap-based
1038/// implementation is used instead.
1039fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1040    use wasmtime::{Config, Memory, MemoryType};
1041    const BITS_TO_TEST: u32 = 42;
1042    let mut config = Config::new();
1043    config.wasm_memory64(true);
1044    config.memory_reservation(1 << BITS_TO_TEST);
1045    let engine = Engine::new(&config)?;
1046    let mut store = Store::new(&engine, ());
1047    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1048    // page size here from the maximum size.
1049    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1050    if Memory::new(&mut store, ty).is_ok() {
1051        Ok(Some(true))
1052    } else {
1053        Ok(None)
1054    }
1055}