wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, anyhow, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::time::Instant;
8use std::{
9    path::PathBuf,
10    sync::{
11        Arc, Mutex,
12        atomic::{AtomicBool, AtomicU64, Ordering},
13    },
14    time::Duration,
15};
16use tokio::sync::Notify;
17use wasmtime::component::{Component, Linker};
18use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
19use wasmtime_wasi::p2::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
20use wasmtime_wasi_http::bindings::ProxyPre;
21use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
22use wasmtime_wasi_http::io::TokioIo;
23use wasmtime_wasi_http::{
24    DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
25    WasiHttpView, body::HyperOutgoingBody,
26};
27
28#[cfg(feature = "wasi-config")]
29use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
30#[cfg(feature = "wasi-keyvalue")]
31use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
32#[cfg(feature = "wasi-nn")]
33use wasmtime_wasi_nn::wit::WasiNnCtx;
34
35struct Host {
36    table: wasmtime::component::ResourceTable,
37    ctx: WasiCtx,
38    http: WasiHttpCtx,
39    http_outgoing_body_buffer_chunks: Option<usize>,
40    http_outgoing_body_chunk_size: Option<usize>,
41
42    limits: StoreLimits,
43
44    #[cfg(feature = "wasi-nn")]
45    nn: Option<WasiNnCtx>,
46
47    #[cfg(feature = "wasi-config")]
48    wasi_config: Option<WasiConfigVariables>,
49
50    #[cfg(feature = "wasi-keyvalue")]
51    wasi_keyvalue: Option<WasiKeyValueCtx>,
52
53    #[cfg(feature = "profiling")]
54    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
55}
56
57impl IoView for Host {
58    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
59        &mut self.table
60    }
61}
62impl WasiView for Host {
63    fn ctx(&mut self) -> &mut WasiCtx {
64        &mut self.ctx
65    }
66}
67
68impl WasiHttpView for Host {
69    fn ctx(&mut self) -> &mut WasiHttpCtx {
70        &mut self.http
71    }
72
73    fn outgoing_body_buffer_chunks(&mut self) -> usize {
74        self.http_outgoing_body_buffer_chunks
75            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
76    }
77
78    fn outgoing_body_chunk_size(&mut self) -> usize {
79        self.http_outgoing_body_chunk_size
80            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
81    }
82}
83
84const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
85    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
86    8080,
87);
88
89/// Runs a WebAssembly module
90#[derive(Parser)]
91pub struct ServeCommand {
92    #[command(flatten)]
93    run: RunCommon,
94
95    /// Socket address for the web server to bind to.
96    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
97    addr: SocketAddr,
98
99    /// Socket address where, when connected to, will initiate a graceful
100    /// shutdown.
101    ///
102    /// Note that graceful shutdown is also supported on ctrl-c.
103    #[arg(long, value_name = "SOCKADDR")]
104    shutdown_addr: Option<SocketAddr>,
105
106    /// Disable log prefixes of wasi-http handlers.
107    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
108    #[arg(long)]
109    no_logging_prefix: bool,
110
111    /// The WebAssembly component to run.
112    #[arg(value_name = "WASM", required = true)]
113    component: PathBuf,
114}
115
116impl ServeCommand {
117    /// Start a server to run the given wasi-http proxy component
118    pub fn execute(mut self) -> Result<()> {
119        self.run.common.init_logging()?;
120
121        // We force cli errors before starting to listen for connections so then
122        // we don't accidentally delay them to the first request.
123
124        if self.run.common.wasi.nn == Some(true) {
125            #[cfg(not(feature = "wasi-nn"))]
126            {
127                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
128            }
129        }
130
131        if self.run.common.wasi.threads == Some(true) {
132            bail!("wasi-threads does not support components yet")
133        }
134
135        // The serve command requires both wasi-http and the component model, so
136        // we enable those by default here.
137        if self.run.common.wasi.http.replace(true) == Some(false) {
138            bail!("wasi-http is required for the serve command, and must not be disabled");
139        }
140        if self.run.common.wasm.component_model.replace(true) == Some(false) {
141            bail!("components are required for the serve command, and must not be disabled");
142        }
143
144        let runtime = tokio::runtime::Builder::new_multi_thread()
145            .enable_time()
146            .enable_io()
147            .build()?;
148
149        runtime.block_on(self.serve())?;
150
151        Ok(())
152    }
153
154    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
155        let mut builder = WasiCtxBuilder::new();
156        self.run.configure_wasip2(&mut builder)?;
157
158        builder.env("REQUEST_ID", req_id.to_string());
159
160        let stdout_prefix: String;
161        let stderr_prefix: String;
162        if self.no_logging_prefix {
163            stdout_prefix = "".to_string();
164            stderr_prefix = "".to_string();
165        } else {
166            stdout_prefix = format!("stdout [{req_id}] :: ");
167            stderr_prefix = format!("stderr [{req_id}] :: ");
168        }
169        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
170        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
171
172        let mut host = Host {
173            table: wasmtime::component::ResourceTable::new(),
174            ctx: builder.build(),
175            http: WasiHttpCtx::new(),
176            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
177            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
178
179            limits: StoreLimits::default(),
180
181            #[cfg(feature = "wasi-nn")]
182            nn: None,
183            #[cfg(feature = "wasi-config")]
184            wasi_config: None,
185            #[cfg(feature = "wasi-keyvalue")]
186            wasi_keyvalue: None,
187            #[cfg(feature = "profiling")]
188            guest_profiler: None,
189        };
190
191        if self.run.common.wasi.nn == Some(true) {
192            #[cfg(feature = "wasi-nn")]
193            {
194                let graphs = self
195                    .run
196                    .common
197                    .wasi
198                    .nn_graph
199                    .iter()
200                    .map(|g| (g.format.clone(), g.dir.clone()))
201                    .collect::<Vec<_>>();
202                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
203                host.nn.replace(WasiNnCtx::new(backends, registry));
204            }
205        }
206
207        if self.run.common.wasi.config == Some(true) {
208            #[cfg(feature = "wasi-config")]
209            {
210                let vars = WasiConfigVariables::from_iter(
211                    self.run
212                        .common
213                        .wasi
214                        .config_var
215                        .iter()
216                        .map(|v| (v.key.clone(), v.value.clone())),
217                );
218                host.wasi_config.replace(vars);
219            }
220        }
221
222        if self.run.common.wasi.keyvalue == Some(true) {
223            #[cfg(feature = "wasi-keyvalue")]
224            {
225                let ctx = WasiKeyValueCtxBuilder::new()
226                    .in_memory_data(
227                        self.run
228                            .common
229                            .wasi
230                            .keyvalue_in_memory_data
231                            .iter()
232                            .map(|v| (v.key.clone(), v.value.clone())),
233                    )
234                    .build();
235                host.wasi_keyvalue.replace(ctx);
236            }
237        }
238
239        let mut store = Store::new(engine, host);
240
241        store.data_mut().limits = self.run.store_limits();
242        store.limiter(|t| &mut t.limits);
243
244        // If fuel has been configured, we want to add the configured
245        // fuel amount to this store.
246        if let Some(fuel) = self.run.common.wasm.fuel {
247            store.set_fuel(fuel)?;
248        }
249
250        Ok(store)
251    }
252
253    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
254        let mut cli = self.run.common.wasi.cli;
255
256        // Accept -Scommon as a deprecated alias for -Scli.
257        if let Some(common) = self.run.common.wasi.common {
258            if cli.is_some() {
259                bail!(
260                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
261                );
262            } else {
263                // In the future, we may add a warning here to tell users to use
264                // `-S cli` instead of `-S common`.
265                cli = Some(common);
266            }
267        }
268
269        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
270        // to serve as a signal to enable all WASI interfaces instead of just
271        // those in the `proxy` world. If `-Scli` is present then add all
272        // `command` APIs and then additionally add in the required HTTP APIs.
273        //
274        // If `-Scli` isn't passed then use the `add_to_linker_async`
275        // bindings which adds just those interfaces that the proxy interface
276        // uses.
277        if cli == Some(true) {
278            let link_options = self.run.compute_wasi_features();
279            wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
280            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
281        } else {
282            wasmtime_wasi_http::add_to_linker_async(linker)?;
283        }
284
285        if self.run.common.wasi.nn == Some(true) {
286            #[cfg(not(feature = "wasi-nn"))]
287            {
288                bail!("support for wasi-nn was disabled at compile time");
289            }
290            #[cfg(feature = "wasi-nn")]
291            {
292                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
293                    let ctx = h.nn.as_mut().unwrap();
294                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
295                })?;
296            }
297        }
298
299        if self.run.common.wasi.config == Some(true) {
300            #[cfg(not(feature = "wasi-config"))]
301            {
302                bail!("support for wasi-config was disabled at compile time");
303            }
304            #[cfg(feature = "wasi-config")]
305            {
306                wasmtime_wasi_config::add_to_linker(linker, |h| {
307                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
308                })?;
309            }
310        }
311
312        if self.run.common.wasi.keyvalue == Some(true) {
313            #[cfg(not(feature = "wasi-keyvalue"))]
314            {
315                bail!("support for wasi-keyvalue was disabled at compile time");
316            }
317            #[cfg(feature = "wasi-keyvalue")]
318            {
319                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
320                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
321                })?;
322            }
323        }
324
325        if self.run.common.wasi.threads == Some(true) {
326            bail!("support for wasi-threads is not available with components");
327        }
328
329        if self.run.common.wasi.http == Some(false) {
330            bail!("support for wasi-http must be enabled for `serve` subcommand");
331        }
332
333        Ok(())
334    }
335
336    async fn serve(mut self) -> Result<()> {
337        use hyper::server::conn::http1;
338
339        let mut config = self
340            .run
341            .common
342            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
343        config.wasm_component_model(true);
344        config.async_support(true);
345
346        if self.run.common.wasm.timeout.is_some() {
347            config.epoch_interruption(true);
348        }
349
350        match self.run.profile {
351            Some(Profile::Native(s)) => {
352                config.profiler(s);
353            }
354            Some(Profile::Guest { .. }) => {
355                config.epoch_interruption(true);
356            }
357            None => {}
358        }
359
360        let engine = Engine::new(&config)?;
361        let mut linker = Linker::new(&engine);
362
363        self.add_to_linker(&mut linker)?;
364
365        let component = match self.run.load_module(&engine, &self.component)? {
366            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
367            RunTarget::Component(c) => c,
368        };
369
370        let instance = linker.instantiate_pre(&component)?;
371        let instance = ProxyPre::new(instance)?;
372
373        // Spawn background task(s) waiting for graceful shutdown signals. This
374        // always listens for ctrl-c but additionally can listen for a TCP
375        // connection to the specified address.
376        let shutdown = Arc::new(GracefulShutdown::default());
377        tokio::task::spawn({
378            let shutdown = shutdown.clone();
379            async move {
380                tokio::signal::ctrl_c().await.unwrap();
381                shutdown.requested.notify_one();
382            }
383        });
384        if let Some(addr) = self.shutdown_addr {
385            let listener = tokio::net::TcpListener::bind(addr).await?;
386            eprintln!(
387                "Listening for shutdown on tcp://{}/",
388                listener.local_addr()?
389            );
390            let shutdown = shutdown.clone();
391            tokio::task::spawn(async move {
392                let _ = listener.accept().await;
393                shutdown.requested.notify_one();
394            });
395        }
396
397        let socket = match &self.addr {
398            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
399            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
400        };
401        // Conditionally enable `SO_REUSEADDR` depending on the current
402        // platform. On Unix we want this to be able to rebind an address in
403        // the `TIME_WAIT` state which can happen then a server is killed with
404        // active TCP connections and then restarted. On Windows though if
405        // `SO_REUSEADDR` is specified then it enables multiple applications to
406        // bind the port at the same time which is not something we want. Hence
407        // this is conditionally set based on the platform (and deviates from
408        // Tokio's default from always-on).
409        socket.set_reuseaddr(!cfg!(windows))?;
410        socket.bind(self.addr)?;
411        let listener = socket.listen(100)?;
412
413        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
414
415        log::info!("Listening on {}", self.addr);
416
417        let handler = ProxyHandler::new(self, engine, instance);
418
419        loop {
420            // Wait for a socket, but also "race" against shutdown to break out
421            // of this loop. Once the graceful shutdown signal is received then
422            // this loop exits immediately.
423            let (stream, _) = tokio::select! {
424                _ = shutdown.requested.notified() => break,
425                v = listener.accept() => v?,
426            };
427            let comp = component.clone();
428            let stream = TokioIo::new(stream);
429            let h = handler.clone();
430            let shutdown_guard = shutdown.clone().increment();
431            tokio::task::spawn(async move {
432                if let Err(e) = http1::Builder::new()
433                    .keep_alive(true)
434                    .serve_connection(
435                        stream,
436                        hyper::service::service_fn(move |req| {
437                            let comp = comp.clone();
438                            let h = h.clone();
439                            async move {
440                                use http_body_util::{BodyExt, Full};
441                                fn to_errorcode(_: Infallible) -> ErrorCode {
442                                    unreachable!()
443                                }
444                                match handle_request(h, req, comp).await {
445                                    Ok(r) => Ok::<_, Infallible>(r),
446                                    Err(e) => {
447                                        eprintln!("error: {e:?}");
448                                        let error_html = "\
449<!doctype html>
450<html>
451<head>
452    <title>500 Internal Server Error</title>
453</head>
454<body>
455    <center>
456        <h1>500 Internal Server Error</h1>
457        <hr>
458        wasmtime
459    </center>
460</body>
461</html>";
462                                        Ok(Response::builder()
463                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
464                                            .header("Content-Type", "text/html; charset=UTF-8")
465                                            .body(
466                                                Full::new(bytes::Bytes::from(error_html))
467                                                    .map_err(to_errorcode)
468                                                    .boxed(),
469                                            )
470                                            .unwrap())
471                                    }
472                                }
473                            }
474                        }),
475                    )
476                    .await
477                {
478                    eprintln!("error: {e:?}");
479                }
480                drop(shutdown_guard);
481            });
482        }
483
484        // Upon exiting the loop we'll no longer process any more incoming
485        // connections but there may still be outstanding connections
486        // processing in child tasks. If there are wait for those to complete
487        // before shutting down completely. Also enable short-circuiting this
488        // wait with a second ctrl-c signal.
489        if shutdown.close() {
490            return Ok(());
491        }
492        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
493        tokio::select! {
494            _ = tokio::signal::ctrl_c() => {}
495            _ = shutdown.complete.notified() => {}
496        }
497
498        Ok(())
499    }
500}
501
502/// Helper structure to manage graceful shutdown int he accept loop above.
503#[derive(Default)]
504struct GracefulShutdown {
505    /// Async notification that shutdown has been requested.
506    requested: Notify,
507    /// Async notification that shutdown has completed, signaled when
508    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
509    complete: Notify,
510    /// Internal state related to what's in progress when shutdown is requested.
511    state: Mutex<GracefulShutdownState>,
512}
513
514#[derive(Default)]
515struct GracefulShutdownState {
516    active_tasks: u32,
517    notify_when_done: bool,
518}
519
520impl GracefulShutdown {
521    /// Increments the number of active tasks and returns a guard indicating
522    fn increment(self: Arc<Self>) -> impl Drop {
523        struct Guard(Arc<GracefulShutdown>);
524
525        let mut state = self.state.lock().unwrap();
526        assert!(!state.notify_when_done);
527        state.active_tasks += 1;
528        drop(state);
529
530        return Guard(self);
531
532        impl Drop for Guard {
533            fn drop(&mut self) {
534                let mut state = self.0.state.lock().unwrap();
535                state.active_tasks -= 1;
536                if state.notify_when_done && state.active_tasks == 0 {
537                    self.0.complete.notify_one();
538                }
539            }
540        }
541    }
542
543    /// Flags this state as done spawning tasks and returns whether there are no
544    /// more child tasks remaining.
545    fn close(&self) -> bool {
546        let mut state = self.state.lock().unwrap();
547        state.notify_when_done = true;
548        state.active_tasks == 0
549    }
550}
551
552/// When executing with a timeout enabled, this is how frequently epoch
553/// interrupts will be executed to check for timeouts. If guest profiling
554/// is enabled, the guest epoch period will be used.
555const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
556
557struct EpochThread {
558    shutdown: Arc<AtomicBool>,
559    handle: Option<std::thread::JoinHandle<()>>,
560}
561
562impl EpochThread {
563    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
564        let shutdown = Arc::new(AtomicBool::new(false));
565        let handle = {
566            let shutdown = Arc::clone(&shutdown);
567            let handle = std::thread::spawn(move || {
568                while !shutdown.load(Ordering::Relaxed) {
569                    std::thread::sleep(interval);
570                    engine.increment_epoch();
571                }
572            });
573            Some(handle)
574        };
575
576        EpochThread { shutdown, handle }
577    }
578}
579
580impl Drop for EpochThread {
581    fn drop(&mut self) {
582        if let Some(handle) = self.handle.take() {
583            self.shutdown.store(true, Ordering::Relaxed);
584            handle.join().unwrap();
585        }
586    }
587}
588
589type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
590
591fn setup_epoch_handler(
592    cmd: &ServeCommand,
593    store: &mut Store<Host>,
594    component: Component,
595) -> Result<(WriteProfile, Option<EpochThread>)> {
596    // Profiling Enabled
597    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
598        #[cfg(feature = "profiling")]
599        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
600        #[cfg(not(feature = "profiling"))]
601        {
602            let _ = (path, interval);
603            bail!("support for profiling disabled at compile time!");
604        }
605    }
606
607    // Profiling disabled but there's a global request timeout
608    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
609        let start = Instant::now();
610        store.epoch_deadline_callback(move |_store| {
611            if start.elapsed() > timeout {
612                bail!("Timeout expired");
613            }
614            Ok(UpdateDeadline::Continue(1))
615        });
616        store.set_epoch_deadline(1);
617        let engine = store.engine().clone();
618        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
619    } else {
620        None
621    };
622
623    Ok((Box::new(|_store| {}), epoch_thread))
624}
625
626#[cfg(feature = "profiling")]
627fn setup_guest_profiler(
628    cmd: &ServeCommand,
629    store: &mut Store<Host>,
630    path: String,
631    interval: Duration,
632    component: Component,
633) -> Result<(WriteProfile, Option<EpochThread>)> {
634    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
635
636    let module_name = "<main>";
637
638    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
639        module_name,
640        interval,
641        component,
642        std::iter::empty(),
643    )));
644
645    fn sample(
646        mut store: StoreContextMut<Host>,
647        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
648    ) {
649        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
650        f(
651            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
652            store.as_context(),
653        );
654        store.data_mut().guest_profiler = Some(profiler);
655    }
656
657    // Hostcall entry/exit, etc.
658    store.call_hook(|store, kind| {
659        sample(store, |profiler, store| profiler.call_hook(store, kind));
660        Ok(())
661    });
662
663    let start = Instant::now();
664    let timeout = cmd.run.common.wasm.timeout;
665    store.epoch_deadline_callback(move |store| {
666        sample(store, |profiler, store| {
667            profiler.sample(store, std::time::Duration::ZERO)
668        });
669
670        // Originally epoch counting was used here; this is problematic in
671        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
672        // when we are not expected to get sample hits.
673        if let Some(timeout) = timeout {
674            if start.elapsed() > timeout {
675                bail!("Timeout expired");
676            }
677        }
678
679        Ok(UpdateDeadline::Continue(1))
680    });
681
682    store.set_epoch_deadline(1);
683    let engine = store.engine().clone();
684    let epoch_thread = Some(EpochThread::spawn(interval, engine));
685
686    let write_profile = Box::new(move |store: &mut Store<Host>| {
687        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
688            .expect("profiling doesn't support threads yet");
689        if let Err(e) = std::fs::File::create(&path)
690            .map_err(anyhow::Error::new)
691            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
692        {
693            eprintln!("failed writing profile at {path}: {e:#}");
694        } else {
695            eprintln!();
696            eprintln!("Profile written to: {path}");
697            eprintln!("View this profile at https://profiler.firefox.com/.");
698        }
699    });
700
701    Ok((write_profile, epoch_thread))
702}
703
704struct ProxyHandlerInner {
705    cmd: ServeCommand,
706    engine: Engine,
707    instance_pre: ProxyPre<Host>,
708    next_id: AtomicU64,
709}
710
711impl ProxyHandlerInner {
712    fn next_req_id(&self) -> u64 {
713        self.next_id.fetch_add(1, Ordering::Relaxed)
714    }
715}
716
717#[derive(Clone)]
718struct ProxyHandler(Arc<ProxyHandlerInner>);
719
720impl ProxyHandler {
721    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
722        Self(Arc::new(ProxyHandlerInner {
723            cmd,
724            engine,
725            instance_pre,
726            next_id: AtomicU64::from(0),
727        }))
728    }
729}
730
731type Request = hyper::Request<hyper::body::Incoming>;
732
733async fn handle_request(
734    ProxyHandler(inner): ProxyHandler,
735    req: Request,
736    component: Component,
737) -> Result<hyper::Response<HyperOutgoingBody>> {
738    let (sender, receiver) = tokio::sync::oneshot::channel();
739
740    let req_id = inner.next_req_id();
741
742    log::info!(
743        "Request {req_id} handling {} to {}",
744        req.method(),
745        req.uri()
746    );
747
748    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
749
750    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
751    let out = store.data_mut().new_response_outparam(sender)?;
752    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
753
754    let comp = component.clone();
755    let task = tokio::task::spawn(async move {
756        let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
757
758        if let Err(e) = proxy
759            .wasi_http_incoming_handler()
760            .call_handle(&mut store, req, out)
761            .await
762        {
763            log::error!("[{req_id}] :: {:?}", e);
764            return Err(e);
765        }
766
767        write_profile(&mut store);
768        drop(epoch_thread);
769
770        Ok(())
771    });
772
773    let result = match receiver.await {
774        Ok(Ok(resp)) => Ok(resp),
775        Ok(Err(e)) => Err(e.into()),
776        Err(_) => {
777            // An error in the receiver (`RecvError`) only indicates that the
778            // task exited before a response was sent (i.e., the sender was
779            // dropped); it does not describe the underlying cause of failure.
780            // Instead we retrieve and propagate the error from inside the task
781            // which should more clearly tell the user what went wrong. Note
782            // that we assume the task has already exited at this point so the
783            // `await` should resolve immediately.
784            let e = match task.await {
785                Ok(Ok(())) => {
786                    bail!("guest never invoked `response-outparam::set` method")
787                }
788                Ok(Err(e)) => e,
789                Err(e) => e.into(),
790            };
791            Err(e.context("guest never invoked `response-outparam::set` method"))
792        }
793    };
794
795    result
796}
797
798#[derive(Clone)]
799enum Output {
800    Stdout,
801    Stderr,
802}
803
804impl Output {
805    fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
806        use std::io::Write;
807
808        match self {
809            Output::Stdout => std::io::stdout().write_all(buf),
810            Output::Stderr => std::io::stderr().write_all(buf),
811        }
812        .map_err(|e| anyhow!(e))
813    }
814}
815
816#[derive(Clone)]
817struct LogStream {
818    prefix: String,
819    output: Output,
820    needs_prefix_on_next_write: bool,
821}
822
823impl LogStream {
824    fn new(prefix: String, output: Output) -> LogStream {
825        LogStream {
826            prefix,
827            output,
828            needs_prefix_on_next_write: true,
829        }
830    }
831}
832
833impl wasmtime_wasi::p2::StdoutStream for LogStream {
834    fn stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
835        Box::new(self.clone())
836    }
837
838    fn isatty(&self) -> bool {
839        use std::io::IsTerminal;
840
841        match &self.output {
842            Output::Stdout => std::io::stdout().is_terminal(),
843            Output::Stderr => std::io::stderr().is_terminal(),
844        }
845    }
846}
847
848impl wasmtime_wasi::p2::OutputStream for LogStream {
849    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
850        let mut bytes = &bytes[..];
851
852        while !bytes.is_empty() {
853            if self.needs_prefix_on_next_write {
854                self.output
855                    .write_all(self.prefix.as_bytes())
856                    .map_err(StreamError::LastOperationFailed)?;
857                self.needs_prefix_on_next_write = false;
858            }
859            match bytes.iter().position(|b| *b == b'\n') {
860                Some(i) => {
861                    let (a, b) = bytes.split_at(i + 1);
862                    bytes = b;
863                    self.output
864                        .write_all(a)
865                        .map_err(StreamError::LastOperationFailed)?;
866                    self.needs_prefix_on_next_write = true;
867                }
868                None => {
869                    self.output
870                        .write_all(bytes)
871                        .map_err(StreamError::LastOperationFailed)?;
872                    break;
873                }
874            }
875        }
876
877        Ok(())
878    }
879
880    fn flush(&mut self) -> StreamResult<()> {
881        Ok(())
882    }
883
884    fn check_write(&mut self) -> StreamResult<usize> {
885        Ok(1024 * 1024)
886    }
887}
888
889#[async_trait::async_trait]
890impl wasmtime_wasi::p2::Pollable for LogStream {
891    async fn ready(&mut self) {}
892}
893
894/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
895/// try to use it when we can. The main cost of the pooling allocator, however,
896/// is the virtual memory required to run it. Not all systems support the same
897/// amount of virtual memory, for example some aarch64 and riscv64 configuration
898/// only support 39 bits of virtual address space.
899///
900/// The pooling allocator, by default, will request 1000 linear memories each
901/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
902/// being about 42 bits of the address space. This exceeds the 39 bit limit of
903/// some systems, so there the pooling allocator will fail by default.
904///
905/// This function attempts to dynamically determine the hint for the pooling
906/// allocator. This returns `Some(true)` if the pooling allocator should be used
907/// by default, or `None` or an error otherwise.
908///
909/// The method for testing this is to allocate a 0-sized 64-bit linear memory
910/// with a maximum size that's N bits large where we force all memories to be
911/// static. This should attempt to acquire N bits of the virtual address space.
912/// If successful that should mean that the pooling allocator is OK to use, but
913/// if it fails then the pooling allocator is not used and the normal mmap-based
914/// implementation is used instead.
915fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
916    use wasmtime::{Config, Memory, MemoryType};
917    const BITS_TO_TEST: u32 = 42;
918    let mut config = Config::new();
919    config.wasm_memory64(true);
920    config.memory_reservation(1 << BITS_TO_TEST);
921    let engine = Engine::new(&config)?;
922    let mut store = Store::new(&engine, ());
923    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
924    // page size here from the maximum size.
925    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
926    if Memory::new(&mut store, ty).is_ok() {
927        Ok(Some(true))
928    } else {
929        Ok(None)
930    }
931}