Skip to main content

wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use std::{
11    path::PathBuf,
12    sync::{
13        Arc, Mutex,
14        atomic::{AtomicBool, AtomicU64, Ordering},
15    },
16    time::Duration,
17};
18use tokio::io::{self, AsyncWrite};
19use tokio::sync::Notify;
20use wasmtime::component::{Component, Linker, ResourceTable};
21use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
22use wasmtime_wasi::p2::{StreamError, StreamResult};
23use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
24use wasmtime_wasi_http::bindings::ProxyPre;
25use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
26use wasmtime_wasi_http::io::TokioIo;
27use wasmtime_wasi_http::{
28    DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
29    WasiHttpView, body::HyperOutgoingBody,
30};
31
32#[cfg(feature = "wasi-config")]
33use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
34#[cfg(feature = "wasi-keyvalue")]
35use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
36#[cfg(feature = "wasi-nn")]
37use wasmtime_wasi_nn::wit::WasiNnCtx;
38
39struct Host {
40    table: wasmtime::component::ResourceTable,
41    ctx: WasiCtx,
42    http: WasiHttpCtx,
43    http_outgoing_body_buffer_chunks: Option<usize>,
44    http_outgoing_body_chunk_size: Option<usize>,
45
46    limits: StoreLimits,
47
48    #[cfg(feature = "wasi-nn")]
49    nn: Option<WasiNnCtx>,
50
51    #[cfg(feature = "wasi-config")]
52    wasi_config: Option<WasiConfigVariables>,
53
54    #[cfg(feature = "wasi-keyvalue")]
55    wasi_keyvalue: Option<WasiKeyValueCtx>,
56
57    #[cfg(feature = "profiling")]
58    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
59}
60
61impl WasiView for Host {
62    fn ctx(&mut self) -> WasiCtxView<'_> {
63        WasiCtxView {
64            ctx: &mut self.ctx,
65            table: &mut self.table,
66        }
67    }
68}
69
70impl WasiHttpView for Host {
71    fn ctx(&mut self) -> &mut WasiHttpCtx {
72        &mut self.http
73    }
74    fn table(&mut self) -> &mut ResourceTable {
75        &mut self.table
76    }
77
78    fn outgoing_body_buffer_chunks(&mut self) -> usize {
79        self.http_outgoing_body_buffer_chunks
80            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
81    }
82
83    fn outgoing_body_chunk_size(&mut self) -> usize {
84        self.http_outgoing_body_chunk_size
85            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
86    }
87}
88
89const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
90    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
91    8080,
92);
93
94/// Runs a WebAssembly module
95#[derive(Parser)]
96pub struct ServeCommand {
97    #[command(flatten)]
98    run: RunCommon,
99
100    /// Socket address for the web server to bind to.
101    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
102    addr: SocketAddr,
103
104    /// Socket address where, when connected to, will initiate a graceful
105    /// shutdown.
106    ///
107    /// Note that graceful shutdown is also supported on ctrl-c.
108    #[arg(long, value_name = "SOCKADDR")]
109    shutdown_addr: Option<SocketAddr>,
110
111    /// Disable log prefixes of wasi-http handlers.
112    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
113    #[arg(long)]
114    no_logging_prefix: bool,
115
116    /// The WebAssembly component to run.
117    #[arg(value_name = "WASM", required = true)]
118    component: PathBuf,
119}
120
121impl ServeCommand {
122    /// Start a server to run the given wasi-http proxy component
123    pub fn execute(mut self) -> Result<()> {
124        self.run.common.init_logging()?;
125
126        // We force cli errors before starting to listen for connections so then
127        // we don't accidentally delay them to the first request.
128
129        if self.run.common.wasi.nn == Some(true) {
130            #[cfg(not(feature = "wasi-nn"))]
131            {
132                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
133            }
134        }
135
136        if self.run.common.wasi.threads == Some(true) {
137            bail!("wasi-threads does not support components yet")
138        }
139
140        // The serve command requires both wasi-http and the component model, so
141        // we enable those by default here.
142        if self.run.common.wasi.http.replace(true) == Some(false) {
143            bail!("wasi-http is required for the serve command, and must not be disabled");
144        }
145        if self.run.common.wasm.component_model.replace(true) == Some(false) {
146            bail!("components are required for the serve command, and must not be disabled");
147        }
148
149        let runtime = tokio::runtime::Builder::new_multi_thread()
150            .enable_time()
151            .enable_io()
152            .build()?;
153
154        runtime.block_on(self.serve())?;
155
156        Ok(())
157    }
158
159    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
160        let mut builder = WasiCtxBuilder::new();
161        self.run.configure_wasip2(&mut builder)?;
162
163        builder.env("REQUEST_ID", req_id.to_string());
164
165        let stdout_prefix: String;
166        let stderr_prefix: String;
167        if self.no_logging_prefix {
168            stdout_prefix = "".to_string();
169            stderr_prefix = "".to_string();
170        } else {
171            stdout_prefix = format!("stdout [{req_id}] :: ");
172            stderr_prefix = format!("stderr [{req_id}] :: ");
173        }
174        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
175        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
176
177        let mut table = wasmtime::component::ResourceTable::new();
178        if let Some(max) = self.run.common.wasi.max_resources {
179            table.set_max_capacity(max);
180        }
181        let mut host = Host {
182            table,
183            ctx: builder.build(),
184            http: self.run.wasi_http_ctx()?,
185            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
186            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
187
188            limits: StoreLimits::default(),
189
190            #[cfg(feature = "wasi-nn")]
191            nn: None,
192            #[cfg(feature = "wasi-config")]
193            wasi_config: None,
194            #[cfg(feature = "wasi-keyvalue")]
195            wasi_keyvalue: None,
196            #[cfg(feature = "profiling")]
197            guest_profiler: None,
198        };
199
200        if self.run.common.wasi.nn == Some(true) {
201            #[cfg(feature = "wasi-nn")]
202            {
203                let graphs = self
204                    .run
205                    .common
206                    .wasi
207                    .nn_graph
208                    .iter()
209                    .map(|g| (g.format.clone(), g.dir.clone()))
210                    .collect::<Vec<_>>();
211                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
212                host.nn.replace(WasiNnCtx::new(backends, registry));
213            }
214        }
215
216        if self.run.common.wasi.config == Some(true) {
217            #[cfg(feature = "wasi-config")]
218            {
219                let vars = WasiConfigVariables::from_iter(
220                    self.run
221                        .common
222                        .wasi
223                        .config_var
224                        .iter()
225                        .map(|v| (v.key.clone(), v.value.clone())),
226                );
227                host.wasi_config.replace(vars);
228            }
229        }
230
231        if self.run.common.wasi.keyvalue == Some(true) {
232            #[cfg(feature = "wasi-keyvalue")]
233            {
234                let ctx = WasiKeyValueCtxBuilder::new()
235                    .in_memory_data(
236                        self.run
237                            .common
238                            .wasi
239                            .keyvalue_in_memory_data
240                            .iter()
241                            .map(|v| (v.key.clone(), v.value.clone())),
242                    )
243                    .build();
244                host.wasi_keyvalue.replace(ctx);
245            }
246        }
247
248        let mut store = Store::new(engine, host);
249
250        if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
251            store.set_hostcall_fuel(fuel);
252        }
253
254        store.data_mut().limits = self.run.store_limits();
255        store.limiter(|t| &mut t.limits);
256
257        // If fuel has been configured, we want to add the configured
258        // fuel amount to this store.
259        if let Some(fuel) = self.run.common.wasm.fuel {
260            store.set_fuel(fuel)?;
261        }
262
263        Ok(store)
264    }
265
266    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
267        let mut cli = self.run.common.wasi.cli;
268
269        // Accept -Scommon as a deprecated alias for -Scli.
270        if let Some(common) = self.run.common.wasi.common {
271            if cli.is_some() {
272                bail!(
273                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
274                );
275            } else {
276                // In the future, we may add a warning here to tell users to use
277                // `-S cli` instead of `-S common`.
278                cli = Some(common);
279            }
280        }
281
282        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
283        // to serve as a signal to enable all WASI interfaces instead of just
284        // those in the `proxy` world. If `-Scli` is present then add all
285        // `command` APIs and then additionally add in the required HTTP APIs.
286        //
287        // If `-Scli` isn't passed then use the `add_to_linker_async`
288        // bindings which adds just those interfaces that the proxy interface
289        // uses.
290        if cli == Some(true) {
291            let link_options = self.run.compute_wasi_features();
292            wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
293            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
294        } else {
295            wasmtime_wasi_http::add_to_linker_async(linker)?;
296        }
297
298        if self.run.common.wasi.nn == Some(true) {
299            #[cfg(not(feature = "wasi-nn"))]
300            {
301                bail!("support for wasi-nn was disabled at compile time");
302            }
303            #[cfg(feature = "wasi-nn")]
304            {
305                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
306                    let ctx = h.nn.as_mut().unwrap();
307                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
308                })?;
309            }
310        }
311
312        if self.run.common.wasi.config == Some(true) {
313            #[cfg(not(feature = "wasi-config"))]
314            {
315                bail!("support for wasi-config was disabled at compile time");
316            }
317            #[cfg(feature = "wasi-config")]
318            {
319                wasmtime_wasi_config::add_to_linker(linker, |h| {
320                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
321                })?;
322            }
323        }
324
325        if self.run.common.wasi.keyvalue == Some(true) {
326            #[cfg(not(feature = "wasi-keyvalue"))]
327            {
328                bail!("support for wasi-keyvalue was disabled at compile time");
329            }
330            #[cfg(feature = "wasi-keyvalue")]
331            {
332                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
333                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
334                })?;
335            }
336        }
337
338        if self.run.common.wasi.threads == Some(true) {
339            bail!("support for wasi-threads is not available with components");
340        }
341
342        if self.run.common.wasi.http == Some(false) {
343            bail!("support for wasi-http must be enabled for `serve` subcommand");
344        }
345
346        Ok(())
347    }
348
349    async fn serve(mut self) -> Result<()> {
350        use hyper::server::conn::http1;
351
352        let mut config = self
353            .run
354            .common
355            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
356        config.wasm_component_model(true);
357        config.async_support(true);
358
359        if self.run.common.wasm.timeout.is_some() {
360            config.epoch_interruption(true);
361        }
362
363        match self.run.profile {
364            Some(Profile::Native(s)) => {
365                config.profiler(s);
366            }
367            Some(Profile::Guest { .. }) => {
368                config.epoch_interruption(true);
369            }
370            None => {}
371        }
372
373        let engine = Engine::new(&config)?;
374        let mut linker = Linker::new(&engine);
375
376        self.add_to_linker(&mut linker)?;
377
378        let component = match self.run.load_module(&engine, &self.component)? {
379            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
380            RunTarget::Component(c) => c,
381        };
382
383        let instance = linker.instantiate_pre(&component)?;
384        let instance = ProxyPre::new(instance)?;
385
386        // Spawn background task(s) waiting for graceful shutdown signals. This
387        // always listens for ctrl-c but additionally can listen for a TCP
388        // connection to the specified address.
389        let shutdown = Arc::new(GracefulShutdown::default());
390        tokio::task::spawn({
391            let shutdown = shutdown.clone();
392            async move {
393                tokio::signal::ctrl_c().await.unwrap();
394                shutdown.requested.notify_one();
395            }
396        });
397        if let Some(addr) = self.shutdown_addr {
398            let listener = tokio::net::TcpListener::bind(addr).await?;
399            eprintln!(
400                "Listening for shutdown on tcp://{}/",
401                listener.local_addr()?
402            );
403            let shutdown = shutdown.clone();
404            tokio::task::spawn(async move {
405                let _ = listener.accept().await;
406                shutdown.requested.notify_one();
407            });
408        }
409
410        let socket = match &self.addr {
411            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
412            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
413        };
414        // Conditionally enable `SO_REUSEADDR` depending on the current
415        // platform. On Unix we want this to be able to rebind an address in
416        // the `TIME_WAIT` state which can happen then a server is killed with
417        // active TCP connections and then restarted. On Windows though if
418        // `SO_REUSEADDR` is specified then it enables multiple applications to
419        // bind the port at the same time which is not something we want. Hence
420        // this is conditionally set based on the platform (and deviates from
421        // Tokio's default from always-on).
422        socket.set_reuseaddr(!cfg!(windows))?;
423        socket.bind(self.addr)?;
424        let listener = socket.listen(100)?;
425
426        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
427
428        log::info!("Listening on {}", self.addr);
429
430        let handler = ProxyHandler::new(self, engine, instance);
431
432        loop {
433            // Wait for a socket, but also "race" against shutdown to break out
434            // of this loop. Once the graceful shutdown signal is received then
435            // this loop exits immediately.
436            let (stream, _) = tokio::select! {
437                _ = shutdown.requested.notified() => break,
438                v = listener.accept() => v?,
439            };
440            let comp = component.clone();
441            let stream = TokioIo::new(stream);
442            let h = handler.clone();
443            let shutdown_guard = shutdown.clone().increment();
444            tokio::task::spawn(async move {
445                if let Err(e) = http1::Builder::new()
446                    .keep_alive(true)
447                    .serve_connection(
448                        stream,
449                        hyper::service::service_fn(move |req| {
450                            let comp = comp.clone();
451                            let h = h.clone();
452                            async move {
453                                use http_body_util::{BodyExt, Full};
454                                fn to_errorcode(_: Infallible) -> ErrorCode {
455                                    unreachable!()
456                                }
457                                match handle_request(h, req, comp).await {
458                                    Ok(r) => Ok::<_, Infallible>(r),
459                                    Err(e) => {
460                                        eprintln!("error: {e:?}");
461                                        let error_html = "\
462<!doctype html>
463<html>
464<head>
465    <title>500 Internal Server Error</title>
466</head>
467<body>
468    <center>
469        <h1>500 Internal Server Error</h1>
470        <hr>
471        wasmtime
472    </center>
473</body>
474</html>";
475                                        Ok(Response::builder()
476                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
477                                            .header("Content-Type", "text/html; charset=UTF-8")
478                                            .body(
479                                                Full::new(bytes::Bytes::from(error_html))
480                                                    .map_err(to_errorcode)
481                                                    .boxed(),
482                                            )
483                                            .unwrap())
484                                    }
485                                }
486                            }
487                        }),
488                    )
489                    .await
490                {
491                    eprintln!("error: {e:?}");
492                }
493                drop(shutdown_guard);
494            });
495        }
496
497        // Upon exiting the loop we'll no longer process any more incoming
498        // connections but there may still be outstanding connections
499        // processing in child tasks. If there are wait for those to complete
500        // before shutting down completely. Also enable short-circuiting this
501        // wait with a second ctrl-c signal.
502        if shutdown.close() {
503            return Ok(());
504        }
505        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
506        tokio::select! {
507            _ = tokio::signal::ctrl_c() => {}
508            _ = shutdown.complete.notified() => {}
509        }
510
511        Ok(())
512    }
513}
514
515/// Helper structure to manage graceful shutdown int he accept loop above.
516#[derive(Default)]
517struct GracefulShutdown {
518    /// Async notification that shutdown has been requested.
519    requested: Notify,
520    /// Async notification that shutdown has completed, signaled when
521    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
522    complete: Notify,
523    /// Internal state related to what's in progress when shutdown is requested.
524    state: Mutex<GracefulShutdownState>,
525}
526
527#[derive(Default)]
528struct GracefulShutdownState {
529    active_tasks: u32,
530    notify_when_done: bool,
531}
532
533impl GracefulShutdown {
534    /// Increments the number of active tasks and returns a guard indicating
535    fn increment(self: Arc<Self>) -> impl Drop {
536        struct Guard(Arc<GracefulShutdown>);
537
538        let mut state = self.state.lock().unwrap();
539        assert!(!state.notify_when_done);
540        state.active_tasks += 1;
541        drop(state);
542
543        return Guard(self);
544
545        impl Drop for Guard {
546            fn drop(&mut self) {
547                let mut state = self.0.state.lock().unwrap();
548                state.active_tasks -= 1;
549                if state.notify_when_done && state.active_tasks == 0 {
550                    self.0.complete.notify_one();
551                }
552            }
553        }
554    }
555
556    /// Flags this state as done spawning tasks and returns whether there are no
557    /// more child tasks remaining.
558    fn close(&self) -> bool {
559        let mut state = self.state.lock().unwrap();
560        state.notify_when_done = true;
561        state.active_tasks == 0
562    }
563}
564
565/// When executing with a timeout enabled, this is how frequently epoch
566/// interrupts will be executed to check for timeouts. If guest profiling
567/// is enabled, the guest epoch period will be used.
568const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
569
570struct EpochThread {
571    shutdown: Arc<AtomicBool>,
572    handle: Option<std::thread::JoinHandle<()>>,
573}
574
575impl EpochThread {
576    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
577        let shutdown = Arc::new(AtomicBool::new(false));
578        let handle = {
579            let shutdown = Arc::clone(&shutdown);
580            let handle = std::thread::spawn(move || {
581                while !shutdown.load(Ordering::Relaxed) {
582                    std::thread::sleep(interval);
583                    engine.increment_epoch();
584                }
585            });
586            Some(handle)
587        };
588
589        EpochThread { shutdown, handle }
590    }
591}
592
593impl Drop for EpochThread {
594    fn drop(&mut self) {
595        if let Some(handle) = self.handle.take() {
596            self.shutdown.store(true, Ordering::Relaxed);
597            handle.join().unwrap();
598        }
599    }
600}
601
602type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
603
604fn setup_epoch_handler(
605    cmd: &ServeCommand,
606    store: &mut Store<Host>,
607    component: Component,
608) -> Result<(WriteProfile, Option<EpochThread>)> {
609    // Profiling Enabled
610    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
611        #[cfg(feature = "profiling")]
612        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
613        #[cfg(not(feature = "profiling"))]
614        {
615            let _ = (path, interval);
616            bail!("support for profiling disabled at compile time!");
617        }
618    }
619
620    // Profiling disabled but there's a global request timeout
621    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
622        let start = Instant::now();
623        store.epoch_deadline_callback(move |_store| {
624            if start.elapsed() > timeout {
625                bail!("Timeout expired");
626            }
627            Ok(UpdateDeadline::Continue(1))
628        });
629        store.set_epoch_deadline(1);
630        let engine = store.engine().clone();
631        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
632    } else {
633        None
634    };
635
636    Ok((Box::new(|_store| {}), epoch_thread))
637}
638
639#[cfg(feature = "profiling")]
640fn setup_guest_profiler(
641    cmd: &ServeCommand,
642    store: &mut Store<Host>,
643    path: String,
644    interval: Duration,
645    component: Component,
646) -> Result<(WriteProfile, Option<EpochThread>)> {
647    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
648
649    let module_name = "<main>";
650
651    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
652        module_name,
653        interval,
654        component,
655        std::iter::empty(),
656    )));
657
658    fn sample(
659        mut store: StoreContextMut<Host>,
660        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
661    ) {
662        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
663        f(
664            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
665            store.as_context(),
666        );
667        store.data_mut().guest_profiler = Some(profiler);
668    }
669
670    // Hostcall entry/exit, etc.
671    store.call_hook(|store, kind| {
672        sample(store, |profiler, store| profiler.call_hook(store, kind));
673        Ok(())
674    });
675
676    let start = Instant::now();
677    let timeout = cmd.run.common.wasm.timeout;
678    store.epoch_deadline_callback(move |store| {
679        sample(store, |profiler, store| {
680            profiler.sample(store, std::time::Duration::ZERO)
681        });
682
683        // Originally epoch counting was used here; this is problematic in
684        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
685        // when we are not expected to get sample hits.
686        if let Some(timeout) = timeout {
687            if start.elapsed() > timeout {
688                bail!("Timeout expired");
689            }
690        }
691
692        Ok(UpdateDeadline::Continue(1))
693    });
694
695    store.set_epoch_deadline(1);
696    let engine = store.engine().clone();
697    let epoch_thread = Some(EpochThread::spawn(interval, engine));
698
699    let write_profile = Box::new(move |store: &mut Store<Host>| {
700        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
701            .expect("profiling doesn't support threads yet");
702        if let Err(e) = std::fs::File::create(&path)
703            .map_err(anyhow::Error::new)
704            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
705        {
706            eprintln!("failed writing profile at {path}: {e:#}");
707        } else {
708            eprintln!();
709            eprintln!("Profile written to: {path}");
710            eprintln!("View this profile at https://profiler.firefox.com/.");
711        }
712    });
713
714    Ok((write_profile, epoch_thread))
715}
716
717struct ProxyHandlerInner {
718    cmd: ServeCommand,
719    engine: Engine,
720    instance_pre: ProxyPre<Host>,
721    next_id: AtomicU64,
722}
723
724impl ProxyHandlerInner {
725    fn next_req_id(&self) -> u64 {
726        self.next_id.fetch_add(1, Ordering::Relaxed)
727    }
728}
729
730#[derive(Clone)]
731struct ProxyHandler(Arc<ProxyHandlerInner>);
732
733impl ProxyHandler {
734    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
735        Self(Arc::new(ProxyHandlerInner {
736            cmd,
737            engine,
738            instance_pre,
739            next_id: AtomicU64::from(0),
740        }))
741    }
742}
743
744type Request = hyper::Request<hyper::body::Incoming>;
745
746async fn handle_request(
747    ProxyHandler(inner): ProxyHandler,
748    req: Request,
749    component: Component,
750) -> Result<hyper::Response<HyperOutgoingBody>> {
751    let (sender, receiver) = tokio::sync::oneshot::channel();
752
753    let req_id = inner.next_req_id();
754
755    log::info!(
756        "Request {req_id} handling {} to {}",
757        req.method(),
758        req.uri()
759    );
760
761    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
762
763    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
764    let out = store.data_mut().new_response_outparam(sender)?;
765    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
766
767    let comp = component.clone();
768    let task = tokio::task::spawn(async move {
769        let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
770
771        if let Err(e) = proxy
772            .wasi_http_incoming_handler()
773            .call_handle(&mut store, req, out)
774            .await
775        {
776            log::error!("[{req_id}] :: {e:?}");
777            return Err(e);
778        }
779
780        write_profile(&mut store);
781        drop(epoch_thread);
782
783        Ok(())
784    });
785
786    let result = match receiver.await {
787        Ok(Ok(resp)) => Ok(resp),
788        Ok(Err(e)) => Err(e.into()),
789        Err(_) => {
790            // An error in the receiver (`RecvError`) only indicates that the
791            // task exited before a response was sent (i.e., the sender was
792            // dropped); it does not describe the underlying cause of failure.
793            // Instead we retrieve and propagate the error from inside the task
794            // which should more clearly tell the user what went wrong. Note
795            // that we assume the task has already exited at this point so the
796            // `await` should resolve immediately.
797            let e = match task.await {
798                Ok(Ok(())) => {
799                    bail!("guest never invoked `response-outparam::set` method")
800                }
801                Ok(Err(e)) => e,
802                Err(e) => e.into(),
803            };
804            Err(e.context("guest never invoked `response-outparam::set` method"))
805        }
806    };
807
808    result
809}
810
811#[derive(Clone)]
812enum Output {
813    Stdout,
814    Stderr,
815}
816
817impl Output {
818    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
819        use std::io::Write;
820
821        match self {
822            Output::Stdout => std::io::stdout().write_all(buf),
823            Output::Stderr => std::io::stderr().write_all(buf),
824        }
825    }
826}
827
828#[derive(Clone)]
829struct LogStream {
830    output: Output,
831    state: Arc<LogStreamState>,
832}
833
834struct LogStreamState {
835    prefix: String,
836    needs_prefix_on_next_write: AtomicBool,
837}
838
839impl LogStream {
840    fn new(prefix: String, output: Output) -> LogStream {
841        LogStream {
842            output,
843            state: Arc::new(LogStreamState {
844                prefix,
845                needs_prefix_on_next_write: AtomicBool::new(true),
846            }),
847        }
848    }
849
850    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
851        while !bytes.is_empty() {
852            if self
853                .state
854                .needs_prefix_on_next_write
855                .load(Ordering::Relaxed)
856            {
857                self.output.write_all(self.state.prefix.as_bytes())?;
858                self.state
859                    .needs_prefix_on_next_write
860                    .store(false, Ordering::Relaxed);
861            }
862            match bytes.iter().position(|b| *b == b'\n') {
863                Some(i) => {
864                    let (a, b) = bytes.split_at(i + 1);
865                    bytes = b;
866                    self.output.write_all(a)?;
867                    self.state
868                        .needs_prefix_on_next_write
869                        .store(true, Ordering::Relaxed);
870                }
871                None => {
872                    self.output.write_all(bytes)?;
873                    break;
874                }
875            }
876        }
877
878        Ok(())
879    }
880}
881
882impl wasmtime_wasi::cli::StdoutStream for LogStream {
883    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
884        Box::new(self.clone())
885    }
886    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
887        Box::new(self.clone())
888    }
889}
890
891impl wasmtime_wasi::cli::IsTerminal for LogStream {
892    fn is_terminal(&self) -> bool {
893        match &self.output {
894            Output::Stdout => std::io::stdout().is_terminal(),
895            Output::Stderr => std::io::stderr().is_terminal(),
896        }
897    }
898}
899
900impl wasmtime_wasi::p2::OutputStream for LogStream {
901    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
902        self.write_all(&bytes)
903            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
904        Ok(())
905    }
906
907    fn flush(&mut self) -> StreamResult<()> {
908        Ok(())
909    }
910
911    fn check_write(&mut self) -> StreamResult<usize> {
912        Ok(1024 * 1024)
913    }
914}
915
916#[async_trait::async_trait]
917impl wasmtime_wasi::p2::Pollable for LogStream {
918    async fn ready(&mut self) {}
919}
920
921impl AsyncWrite for LogStream {
922    fn poll_write(
923        mut self: Pin<&mut Self>,
924        _cx: &mut Context<'_>,
925        buf: &[u8],
926    ) -> Poll<io::Result<usize>> {
927        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
928    }
929    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
930        Poll::Ready(Ok(()))
931    }
932    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
933        Poll::Ready(Ok(()))
934    }
935}
936
937/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
938/// try to use it when we can. The main cost of the pooling allocator, however,
939/// is the virtual memory required to run it. Not all systems support the same
940/// amount of virtual memory, for example some aarch64 and riscv64 configuration
941/// only support 39 bits of virtual address space.
942///
943/// The pooling allocator, by default, will request 1000 linear memories each
944/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
945/// being about 42 bits of the address space. This exceeds the 39 bit limit of
946/// some systems, so there the pooling allocator will fail by default.
947///
948/// This function attempts to dynamically determine the hint for the pooling
949/// allocator. This returns `Some(true)` if the pooling allocator should be used
950/// by default, or `None` or an error otherwise.
951///
952/// The method for testing this is to allocate a 0-sized 64-bit linear memory
953/// with a maximum size that's N bits large where we force all memories to be
954/// static. This should attempt to acquire N bits of the virtual address space.
955/// If successful that should mean that the pooling allocator is OK to use, but
956/// if it fails then the pooling allocator is not used and the normal mmap-based
957/// implementation is used instead.
958fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
959    use wasmtime::{Config, Memory, MemoryType};
960    const BITS_TO_TEST: u32 = 42;
961    let mut config = Config::new();
962    config.wasm_memory64(true);
963    config.memory_reservation(1 << BITS_TO_TEST);
964    let engine = Engine::new(&config)?;
965    let mut store = Store::new(&engine, ());
966    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
967    // page size here from the maximum size.
968    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
969    if Memory::new(&mut store, ty).is_ok() {
970        Ok(Some(true))
971    } else {
972        Ok(None)
973    }
974}