wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::time::Instant;
6use std::{
7    path::PathBuf,
8    sync::{
9        atomic::{AtomicBool, AtomicU64, Ordering},
10        Arc, Mutex,
11    },
12    time::Duration,
13};
14use tokio::sync::Notify;
15use wasmtime::component::{Component, Linker};
16use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
17use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
18use wasmtime_wasi_http::bindings::http::types::Scheme;
19use wasmtime_wasi_http::bindings::ProxyPre;
20use wasmtime_wasi_http::io::TokioIo;
21use wasmtime_wasi_http::{
22    body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView, DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS,
23    DEFAULT_OUTGOING_BODY_CHUNK_SIZE,
24};
25
26#[cfg(feature = "wasi-config")]
27use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
28#[cfg(feature = "wasi-keyvalue")]
29use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
30#[cfg(feature = "wasi-nn")]
31use wasmtime_wasi_nn::wit::WasiNnCtx;
32
33struct Host {
34    table: wasmtime::component::ResourceTable,
35    ctx: WasiCtx,
36    http: WasiHttpCtx,
37    http_outgoing_body_buffer_chunks: Option<usize>,
38    http_outgoing_body_chunk_size: Option<usize>,
39
40    limits: StoreLimits,
41
42    #[cfg(feature = "wasi-nn")]
43    nn: Option<WasiNnCtx>,
44
45    #[cfg(feature = "wasi-config")]
46    wasi_config: Option<WasiConfigVariables>,
47
48    #[cfg(feature = "wasi-keyvalue")]
49    wasi_keyvalue: Option<WasiKeyValueCtx>,
50
51    #[cfg(feature = "profiling")]
52    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
53}
54
55impl IoView for Host {
56    fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
57        &mut self.table
58    }
59}
60impl WasiView for Host {
61    fn ctx(&mut self) -> &mut WasiCtx {
62        &mut self.ctx
63    }
64}
65
66impl WasiHttpView for Host {
67    fn ctx(&mut self) -> &mut WasiHttpCtx {
68        &mut self.http
69    }
70
71    fn outgoing_body_buffer_chunks(&mut self) -> usize {
72        self.http_outgoing_body_buffer_chunks
73            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
74    }
75
76    fn outgoing_body_chunk_size(&mut self) -> usize {
77        self.http_outgoing_body_chunk_size
78            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
79    }
80}
81
82const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
83    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
84    8080,
85);
86
87/// Runs a WebAssembly module
88#[derive(Parser)]
89pub struct ServeCommand {
90    #[command(flatten)]
91    run: RunCommon,
92
93    /// Socket address for the web server to bind to.
94    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
95    addr: SocketAddr,
96
97    /// Socket address where, when connected to, will initiate a graceful
98    /// shutdown.
99    ///
100    /// Note that graceful shutdown is also supported on ctrl-c.
101    #[arg(long, value_name = "SOCKADDR")]
102    shutdown_addr: Option<SocketAddr>,
103
104    /// Disable log prefixes of wasi-http handlers.
105    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
106    #[arg(long)]
107    no_logging_prefix: bool,
108
109    /// The WebAssembly component to run.
110    #[arg(value_name = "WASM", required = true)]
111    component: PathBuf,
112}
113
114impl ServeCommand {
115    /// Start a server to run the given wasi-http proxy component
116    pub fn execute(mut self) -> Result<()> {
117        self.run.common.init_logging()?;
118
119        // We force cli errors before starting to listen for connections so then
120        // we don't accidentally delay them to the first request.
121
122        if self.run.common.wasi.nn == Some(true) {
123            #[cfg(not(feature = "wasi-nn"))]
124            {
125                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
126            }
127        }
128
129        if self.run.common.wasi.threads == Some(true) {
130            bail!("wasi-threads does not support components yet")
131        }
132
133        // The serve command requires both wasi-http and the component model, so
134        // we enable those by default here.
135        if self.run.common.wasi.http.replace(true) == Some(false) {
136            bail!("wasi-http is required for the serve command, and must not be disabled");
137        }
138        if self.run.common.wasm.component_model.replace(true) == Some(false) {
139            bail!("components are required for the serve command, and must not be disabled");
140        }
141
142        let runtime = tokio::runtime::Builder::new_multi_thread()
143            .enable_time()
144            .enable_io()
145            .build()?;
146
147        runtime.block_on(self.serve())?;
148
149        Ok(())
150    }
151
152    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
153        let mut builder = WasiCtxBuilder::new();
154        self.run.configure_wasip2(&mut builder)?;
155
156        builder.env("REQUEST_ID", req_id.to_string());
157
158        let stdout_prefix: String;
159        let stderr_prefix: String;
160        if self.no_logging_prefix {
161            stdout_prefix = "".to_string();
162            stderr_prefix = "".to_string();
163        } else {
164            stdout_prefix = format!("stdout [{req_id}] :: ");
165            stderr_prefix = format!("stderr [{req_id}] :: ");
166        }
167        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
168        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
169
170        let mut host = Host {
171            table: wasmtime::component::ResourceTable::new(),
172            ctx: builder.build(),
173            http: WasiHttpCtx::new(),
174            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
175            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
176
177            limits: StoreLimits::default(),
178
179            #[cfg(feature = "wasi-nn")]
180            nn: None,
181            #[cfg(feature = "wasi-config")]
182            wasi_config: None,
183            #[cfg(feature = "wasi-keyvalue")]
184            wasi_keyvalue: None,
185            #[cfg(feature = "profiling")]
186            guest_profiler: None,
187        };
188
189        if self.run.common.wasi.nn == Some(true) {
190            #[cfg(feature = "wasi-nn")]
191            {
192                let graphs = self
193                    .run
194                    .common
195                    .wasi
196                    .nn_graph
197                    .iter()
198                    .map(|g| (g.format.clone(), g.dir.clone()))
199                    .collect::<Vec<_>>();
200                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
201                host.nn.replace(WasiNnCtx::new(backends, registry));
202            }
203        }
204
205        if self.run.common.wasi.config == Some(true) {
206            #[cfg(feature = "wasi-config")]
207            {
208                let vars = WasiConfigVariables::from_iter(
209                    self.run
210                        .common
211                        .wasi
212                        .config_var
213                        .iter()
214                        .map(|v| (v.key.clone(), v.value.clone())),
215                );
216                host.wasi_config.replace(vars);
217            }
218        }
219
220        if self.run.common.wasi.keyvalue == Some(true) {
221            #[cfg(feature = "wasi-keyvalue")]
222            {
223                let ctx = WasiKeyValueCtxBuilder::new()
224                    .in_memory_data(
225                        self.run
226                            .common
227                            .wasi
228                            .keyvalue_in_memory_data
229                            .iter()
230                            .map(|v| (v.key.clone(), v.value.clone())),
231                    )
232                    .build();
233                host.wasi_keyvalue.replace(ctx);
234            }
235        }
236
237        let mut store = Store::new(engine, host);
238
239        store.data_mut().limits = self.run.store_limits();
240        store.limiter(|t| &mut t.limits);
241
242        // If fuel has been configured, we want to add the configured
243        // fuel amount to this store.
244        if let Some(fuel) = self.run.common.wasm.fuel {
245            store.set_fuel(fuel)?;
246        }
247
248        Ok(store)
249    }
250
251    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
252        let mut cli = self.run.common.wasi.cli;
253
254        // Accept -Scommon as a deprecated alias for -Scli.
255        if let Some(common) = self.run.common.wasi.common {
256            if cli.is_some() {
257                bail!(
258                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
259                );
260            } else {
261                // In the future, we may add a warning here to tell users to use
262                // `-S cli` instead of `-S common`.
263                cli = Some(common);
264            }
265        }
266
267        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
268        // to serve as a signal to enable all WASI interfaces instead of just
269        // those in the `proxy` world. If `-Scli` is present then add all
270        // `command` APIs and then additionally add in the required HTTP APIs.
271        //
272        // If `-Scli` isn't passed then use the `add_to_linker_async`
273        // bindings which adds just those interfaces that the proxy interface
274        // uses.
275        if cli == Some(true) {
276            let link_options = self.run.compute_wasi_features();
277            wasmtime_wasi::add_to_linker_with_options_async(linker, &link_options)?;
278            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
279        } else {
280            wasmtime_wasi_http::add_to_linker_async(linker)?;
281        }
282
283        if self.run.common.wasi.nn == Some(true) {
284            #[cfg(not(feature = "wasi-nn"))]
285            {
286                bail!("support for wasi-nn was disabled at compile time");
287            }
288            #[cfg(feature = "wasi-nn")]
289            {
290                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
291                    let ctx = h.nn.as_mut().unwrap();
292                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
293                })?;
294            }
295        }
296
297        if self.run.common.wasi.config == Some(true) {
298            #[cfg(not(feature = "wasi-config"))]
299            {
300                bail!("support for wasi-config was disabled at compile time");
301            }
302            #[cfg(feature = "wasi-config")]
303            {
304                wasmtime_wasi_config::add_to_linker(linker, |h| {
305                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
306                })?;
307            }
308        }
309
310        if self.run.common.wasi.keyvalue == Some(true) {
311            #[cfg(not(feature = "wasi-keyvalue"))]
312            {
313                bail!("support for wasi-keyvalue was disabled at compile time");
314            }
315            #[cfg(feature = "wasi-keyvalue")]
316            {
317                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
318                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
319                })?;
320            }
321        }
322
323        if self.run.common.wasi.threads == Some(true) {
324            bail!("support for wasi-threads is not available with components");
325        }
326
327        if self.run.common.wasi.http == Some(false) {
328            bail!("support for wasi-http must be enabled for `serve` subcommand");
329        }
330
331        Ok(())
332    }
333
334    async fn serve(mut self) -> Result<()> {
335        use hyper::server::conn::http1;
336
337        let mut config = self
338            .run
339            .common
340            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
341        config.wasm_component_model(true);
342        config.async_support(true);
343
344        if self.run.common.wasm.timeout.is_some() {
345            config.epoch_interruption(true);
346        }
347
348        match self.run.profile {
349            Some(Profile::Native(s)) => {
350                config.profiler(s);
351            }
352            Some(Profile::Guest { .. }) => {
353                config.epoch_interruption(true);
354            }
355            None => {}
356        }
357
358        let engine = Engine::new(&config)?;
359        let mut linker = Linker::new(&engine);
360
361        self.add_to_linker(&mut linker)?;
362
363        let component = match self.run.load_module(&engine, &self.component)? {
364            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
365            RunTarget::Component(c) => c,
366        };
367
368        let instance = linker.instantiate_pre(&component)?;
369        let instance = ProxyPre::new(instance)?;
370
371        // Spawn background task(s) waiting for graceful shutdown signals. This
372        // always listens for ctrl-c but additionally can listen for a TCP
373        // connection to the specified address.
374        let shutdown = Arc::new(GracefulShutdown::default());
375        tokio::task::spawn({
376            let shutdown = shutdown.clone();
377            async move {
378                tokio::signal::ctrl_c().await.unwrap();
379                shutdown.requested.notify_one();
380            }
381        });
382        if let Some(addr) = self.shutdown_addr {
383            let listener = tokio::net::TcpListener::bind(addr).await?;
384            eprintln!(
385                "Listening for shutdown on tcp://{}/",
386                listener.local_addr()?
387            );
388            let shutdown = shutdown.clone();
389            tokio::task::spawn(async move {
390                let _ = listener.accept().await;
391                shutdown.requested.notify_one();
392            });
393        }
394
395        let socket = match &self.addr {
396            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
397            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
398        };
399        // Conditionally enable `SO_REUSEADDR` depending on the current
400        // platform. On Unix we want this to be able to rebind an address in
401        // the `TIME_WAIT` state which can happen then a server is killed with
402        // active TCP connections and then restarted. On Windows though if
403        // `SO_REUSEADDR` is specified then it enables multiple applications to
404        // bind the port at the same time which is not something we want. Hence
405        // this is conditionally set based on the platform (and deviates from
406        // Tokio's default from always-on).
407        socket.set_reuseaddr(!cfg!(windows))?;
408        socket.bind(self.addr)?;
409        let listener = socket.listen(100)?;
410
411        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
412
413        log::info!("Listening on {}", self.addr);
414
415        let handler = ProxyHandler::new(self, engine, instance);
416
417        loop {
418            // Wait for a socket, but also "race" against shutdown to break out
419            // of this loop. Once the graceful shutdown signal is received then
420            // this loop exits immediately.
421            let (stream, _) = tokio::select! {
422                _ = shutdown.requested.notified() => break,
423                v = listener.accept() => v?,
424            };
425            let comp = component.clone();
426            let stream = TokioIo::new(stream);
427            let h = handler.clone();
428            let shutdown_guard = shutdown.clone().increment();
429            tokio::task::spawn(async move {
430                if let Err(e) = http1::Builder::new()
431                    .keep_alive(true)
432                    .serve_connection(
433                        stream,
434                        hyper::service::service_fn(move |req| {
435                            handle_request(h.clone(), req, comp.clone())
436                        }),
437                    )
438                    .await
439                {
440                    eprintln!("error: {e:?}");
441                }
442                drop(shutdown_guard);
443            });
444        }
445
446        // Upon exiting the loop we'll no longer process any more incoming
447        // connections but there may still be outstanding connections
448        // processing in child tasks. If there are wait for those to complete
449        // before shutting down completely. Also enable short-circuiting this
450        // wait with a second ctrl-c signal.
451        if shutdown.close() {
452            return Ok(());
453        }
454        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
455        tokio::select! {
456            _ = tokio::signal::ctrl_c() => {}
457            _ = shutdown.complete.notified() => {}
458        }
459
460        Ok(())
461    }
462}
463
464/// Helper structure to manage graceful shutdown int he accept loop above.
465#[derive(Default)]
466struct GracefulShutdown {
467    /// Async notification that shutdown has been requested.
468    requested: Notify,
469    /// Async notification that shutdown has completed, signaled when
470    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
471    complete: Notify,
472    /// Internal state related to what's in progress when shutdown is requested.
473    state: Mutex<GracefulShutdownState>,
474}
475
476#[derive(Default)]
477struct GracefulShutdownState {
478    active_tasks: u32,
479    notify_when_done: bool,
480}
481
482impl GracefulShutdown {
483    /// Increments the number of active tasks and returns a guard indicating
484    fn increment(self: Arc<Self>) -> impl Drop {
485        struct Guard(Arc<GracefulShutdown>);
486
487        let mut state = self.state.lock().unwrap();
488        assert!(!state.notify_when_done);
489        state.active_tasks += 1;
490        drop(state);
491
492        return Guard(self);
493
494        impl Drop for Guard {
495            fn drop(&mut self) {
496                let mut state = self.0.state.lock().unwrap();
497                state.active_tasks -= 1;
498                if state.notify_when_done && state.active_tasks == 0 {
499                    self.0.complete.notify_one();
500                }
501            }
502        }
503    }
504
505    /// Flags this state as done spawning tasks and returns whether there are no
506    /// more child tasks remaining.
507    fn close(&self) -> bool {
508        let mut state = self.state.lock().unwrap();
509        state.notify_when_done = true;
510        state.active_tasks == 0
511    }
512}
513
514/// When executing with a timeout enabled, this is how frequently epoch
515/// interrupts will be executed to check for timeouts. If guest profiling
516/// is enabled, the guest epoch period will be used.
517const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
518
519struct EpochThread {
520    shutdown: Arc<AtomicBool>,
521    handle: Option<std::thread::JoinHandle<()>>,
522}
523
524impl EpochThread {
525    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
526        let shutdown = Arc::new(AtomicBool::new(false));
527        let handle = {
528            let shutdown = Arc::clone(&shutdown);
529            let handle = std::thread::spawn(move || {
530                while !shutdown.load(Ordering::Relaxed) {
531                    std::thread::sleep(interval);
532                    engine.increment_epoch();
533                }
534            });
535            Some(handle)
536        };
537
538        EpochThread { shutdown, handle }
539    }
540}
541
542impl Drop for EpochThread {
543    fn drop(&mut self) {
544        if let Some(handle) = self.handle.take() {
545            self.shutdown.store(true, Ordering::Relaxed);
546            handle.join().unwrap();
547        }
548    }
549}
550
551type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
552
553fn setup_epoch_handler(
554    cmd: &ServeCommand,
555    store: &mut Store<Host>,
556    component: Component,
557) -> Result<(WriteProfile, Option<EpochThread>)> {
558    // Profiling Enabled
559    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
560        #[cfg(feature = "profiling")]
561        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
562        #[cfg(not(feature = "profiling"))]
563        {
564            let _ = (path, interval);
565            bail!("support for profiling disabled at compile time!");
566        }
567    }
568
569    // Profiling disabled but there's a global request timeout
570    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
571        let start = Instant::now();
572        store.epoch_deadline_callback(move |_store| {
573            if start.elapsed() > timeout {
574                bail!("Timeout expired");
575            }
576            Ok(UpdateDeadline::Continue(1))
577        });
578        store.set_epoch_deadline(1);
579        let engine = store.engine().clone();
580        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
581    } else {
582        None
583    };
584
585    Ok((Box::new(|_store| {}), epoch_thread))
586}
587
588#[cfg(feature = "profiling")]
589fn setup_guest_profiler(
590    cmd: &ServeCommand,
591    store: &mut Store<Host>,
592    path: String,
593    interval: Duration,
594    component: Component,
595) -> Result<(WriteProfile, Option<EpochThread>)> {
596    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
597
598    let module_name = "<main>";
599
600    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
601        module_name,
602        interval,
603        component,
604        std::iter::empty(),
605    )));
606
607    fn sample(
608        mut store: StoreContextMut<Host>,
609        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
610    ) {
611        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
612        f(
613            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
614            store.as_context(),
615        );
616        store.data_mut().guest_profiler = Some(profiler);
617    }
618
619    // Hostcall entry/exit, etc.
620    store.call_hook(|store, kind| {
621        sample(store, |profiler, store| profiler.call_hook(store, kind));
622        Ok(())
623    });
624
625    let start = Instant::now();
626    let timeout = cmd.run.common.wasm.timeout;
627    store.epoch_deadline_callback(move |store| {
628        sample(store, |profiler, store| {
629            profiler.sample(store, std::time::Duration::ZERO)
630        });
631
632        // Originally epoch counting was used here; this is problematic in
633        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
634        // when we are not expected to get sample hits.
635        if let Some(timeout) = timeout {
636            if start.elapsed() > timeout {
637                bail!("Timeout expired");
638            }
639        }
640
641        Ok(UpdateDeadline::Continue(1))
642    });
643
644    store.set_epoch_deadline(1);
645    let engine = store.engine().clone();
646    let epoch_thread = Some(EpochThread::spawn(interval, engine));
647
648    let write_profile = Box::new(move |store: &mut Store<Host>| {
649        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
650            .expect("profiling doesn't support threads yet");
651        if let Err(e) = std::fs::File::create(&path)
652            .map_err(anyhow::Error::new)
653            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
654        {
655            eprintln!("failed writing profile at {path}: {e:#}");
656        } else {
657            eprintln!();
658            eprintln!("Profile written to: {path}");
659            eprintln!("View this profile at https://profiler.firefox.com/.");
660        }
661    });
662
663    Ok((write_profile, epoch_thread))
664}
665
666struct ProxyHandlerInner {
667    cmd: ServeCommand,
668    engine: Engine,
669    instance_pre: ProxyPre<Host>,
670    next_id: AtomicU64,
671}
672
673impl ProxyHandlerInner {
674    fn next_req_id(&self) -> u64 {
675        self.next_id.fetch_add(1, Ordering::Relaxed)
676    }
677}
678
679#[derive(Clone)]
680struct ProxyHandler(Arc<ProxyHandlerInner>);
681
682impl ProxyHandler {
683    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
684        Self(Arc::new(ProxyHandlerInner {
685            cmd,
686            engine,
687            instance_pre,
688            next_id: AtomicU64::from(0),
689        }))
690    }
691}
692
693type Request = hyper::Request<hyper::body::Incoming>;
694
695async fn handle_request(
696    ProxyHandler(inner): ProxyHandler,
697    req: Request,
698    component: Component,
699) -> Result<hyper::Response<HyperOutgoingBody>> {
700    let (sender, receiver) = tokio::sync::oneshot::channel();
701
702    let req_id = inner.next_req_id();
703
704    log::info!(
705        "Request {req_id} handling {} to {}",
706        req.method(),
707        req.uri()
708    );
709
710    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
711
712    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
713    let out = store.data_mut().new_response_outparam(sender)?;
714    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
715
716    let comp = component.clone();
717    let task = tokio::task::spawn(async move {
718        let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
719
720        if let Err(e) = proxy
721            .wasi_http_incoming_handler()
722            .call_handle(&mut store, req, out)
723            .await
724        {
725            log::error!("[{req_id}] :: {:?}", e);
726            return Err(e);
727        }
728
729        write_profile(&mut store);
730        drop(epoch_thread);
731
732        Ok(())
733    });
734
735    let result = match receiver.await {
736        Ok(Ok(resp)) => Ok(resp),
737        Ok(Err(e)) => Err(e.into()),
738        Err(_) => {
739            // An error in the receiver (`RecvError`) only indicates that the
740            // task exited before a response was sent (i.e., the sender was
741            // dropped); it does not describe the underlying cause of failure.
742            // Instead we retrieve and propagate the error from inside the task
743            // which should more clearly tell the user what went wrong. Note
744            // that we assume the task has already exited at this point so the
745            // `await` should resolve immediately.
746            let e = match task.await {
747                Ok(Ok(())) => {
748                    bail!("guest never invoked `response-outparam::set` method")
749                }
750                Ok(Err(e)) => e,
751                Err(e) => e.into(),
752            };
753            Err(e.context("guest never invoked `response-outparam::set` method"))
754        }
755    };
756
757    result
758}
759
760#[derive(Clone)]
761enum Output {
762    Stdout,
763    Stderr,
764}
765
766impl Output {
767    fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
768        use std::io::Write;
769
770        match self {
771            Output::Stdout => std::io::stdout().write_all(buf),
772            Output::Stderr => std::io::stderr().write_all(buf),
773        }
774        .map_err(|e| anyhow!(e))
775    }
776}
777
778#[derive(Clone)]
779struct LogStream {
780    prefix: String,
781    output: Output,
782    needs_prefix_on_next_write: bool,
783}
784
785impl LogStream {
786    fn new(prefix: String, output: Output) -> LogStream {
787        LogStream {
788            prefix,
789            output,
790            needs_prefix_on_next_write: true,
791        }
792    }
793}
794
795impl wasmtime_wasi::StdoutStream for LogStream {
796    fn stream(&self) -> Box<dyn wasmtime_wasi::OutputStream> {
797        Box::new(self.clone())
798    }
799
800    fn isatty(&self) -> bool {
801        use std::io::IsTerminal;
802
803        match &self.output {
804            Output::Stdout => std::io::stdout().is_terminal(),
805            Output::Stderr => std::io::stderr().is_terminal(),
806        }
807    }
808}
809
810impl wasmtime_wasi::OutputStream for LogStream {
811    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
812        let mut bytes = &bytes[..];
813
814        while !bytes.is_empty() {
815            if self.needs_prefix_on_next_write {
816                self.output
817                    .write_all(self.prefix.as_bytes())
818                    .map_err(StreamError::LastOperationFailed)?;
819                self.needs_prefix_on_next_write = false;
820            }
821            match bytes.iter().position(|b| *b == b'\n') {
822                Some(i) => {
823                    let (a, b) = bytes.split_at(i + 1);
824                    bytes = b;
825                    self.output
826                        .write_all(a)
827                        .map_err(StreamError::LastOperationFailed)?;
828                    self.needs_prefix_on_next_write = true;
829                }
830                None => {
831                    self.output
832                        .write_all(bytes)
833                        .map_err(StreamError::LastOperationFailed)?;
834                    break;
835                }
836            }
837        }
838
839        Ok(())
840    }
841
842    fn flush(&mut self) -> StreamResult<()> {
843        Ok(())
844    }
845
846    fn check_write(&mut self) -> StreamResult<usize> {
847        Ok(1024 * 1024)
848    }
849}
850
851#[async_trait::async_trait]
852impl wasmtime_wasi::Pollable for LogStream {
853    async fn ready(&mut self) {}
854}
855
856/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
857/// try to use it when we can. The main cost of the pooling allocator, however,
858/// is the virtual memory required to run it. Not all systems support the same
859/// amount of virtual memory, for example some aarch64 and riscv64 configuration
860/// only support 39 bits of virtual address space.
861///
862/// The pooling allocator, by default, will request 1000 linear memories each
863/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
864/// being about 42 bits of the address space. This exceeds the 39 bit limit of
865/// some systems, so there the pooling allocator will fail by default.
866///
867/// This function attempts to dynamically determine the hint for the pooling
868/// allocator. This returns `Some(true)` if the pooling allocator should be used
869/// by default, or `None` or an error otherwise.
870///
871/// The method for testing this is to allocate a 0-sized 64-bit linear memory
872/// with a maximum size that's N bits large where we force all memories to be
873/// static. This should attempt to acquire N bits of the virtual address space.
874/// If successful that should mean that the pooling allocator is OK to use, but
875/// if it fails then the pooling allocator is not used and the normal mmap-based
876/// implementation is used instead.
877fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
878    use wasmtime::{Config, Memory, MemoryType};
879    const BITS_TO_TEST: u32 = 42;
880    let mut config = Config::new();
881    config.wasm_memory64(true);
882    config.memory_reservation(1 << BITS_TO_TEST);
883    let engine = Engine::new(&config)?;
884    let mut store = Store::new(&engine, ());
885    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
886    // page size here from the maximum size.
887    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
888    if Memory::new(&mut store, ty).is_ok() {
889        Ok(Some(true))
890    } else {
891        Ok(None)
892    }
893}