wasmtime_cli/commands/
serve.rs

1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use std::{
11    path::PathBuf,
12    sync::{
13        Arc, Mutex,
14        atomic::{AtomicBool, AtomicU64, Ordering},
15    },
16    time::Duration,
17};
18use tokio::io::{self, AsyncWrite};
19use tokio::sync::Notify;
20use wasmtime::component::{Component, Linker, ResourceTable};
21use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
22use wasmtime_wasi::p2::{StreamError, StreamResult};
23use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
24use wasmtime_wasi_http::bindings::ProxyPre;
25use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
26use wasmtime_wasi_http::io::TokioIo;
27use wasmtime_wasi_http::{
28    DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
29    WasiHttpView, body::HyperOutgoingBody,
30};
31
32#[cfg(feature = "wasi-config")]
33use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
34#[cfg(feature = "wasi-keyvalue")]
35use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
36#[cfg(feature = "wasi-nn")]
37use wasmtime_wasi_nn::wit::WasiNnCtx;
38
39struct Host {
40    table: wasmtime::component::ResourceTable,
41    ctx: WasiCtx,
42    http: WasiHttpCtx,
43    http_outgoing_body_buffer_chunks: Option<usize>,
44    http_outgoing_body_chunk_size: Option<usize>,
45
46    limits: StoreLimits,
47
48    #[cfg(feature = "wasi-nn")]
49    nn: Option<WasiNnCtx>,
50
51    #[cfg(feature = "wasi-config")]
52    wasi_config: Option<WasiConfigVariables>,
53
54    #[cfg(feature = "wasi-keyvalue")]
55    wasi_keyvalue: Option<WasiKeyValueCtx>,
56
57    #[cfg(feature = "profiling")]
58    guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
59}
60
61impl WasiView for Host {
62    fn ctx(&mut self) -> WasiCtxView<'_> {
63        WasiCtxView {
64            ctx: &mut self.ctx,
65            table: &mut self.table,
66        }
67    }
68}
69
70impl WasiHttpView for Host {
71    fn ctx(&mut self) -> &mut WasiHttpCtx {
72        &mut self.http
73    }
74    fn table(&mut self) -> &mut ResourceTable {
75        &mut self.table
76    }
77
78    fn outgoing_body_buffer_chunks(&mut self) -> usize {
79        self.http_outgoing_body_buffer_chunks
80            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
81    }
82
83    fn outgoing_body_chunk_size(&mut self) -> usize {
84        self.http_outgoing_body_chunk_size
85            .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
86    }
87}
88
89const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
90    std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
91    8080,
92);
93
94/// Runs a WebAssembly module
95#[derive(Parser)]
96pub struct ServeCommand {
97    #[command(flatten)]
98    run: RunCommon,
99
100    /// Socket address for the web server to bind to.
101    #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
102    addr: SocketAddr,
103
104    /// Socket address where, when connected to, will initiate a graceful
105    /// shutdown.
106    ///
107    /// Note that graceful shutdown is also supported on ctrl-c.
108    #[arg(long, value_name = "SOCKADDR")]
109    shutdown_addr: Option<SocketAddr>,
110
111    /// Disable log prefixes of wasi-http handlers.
112    /// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
113    #[arg(long)]
114    no_logging_prefix: bool,
115
116    /// The WebAssembly component to run.
117    #[arg(value_name = "WASM", required = true)]
118    component: PathBuf,
119}
120
121impl ServeCommand {
122    /// Start a server to run the given wasi-http proxy component
123    pub fn execute(mut self) -> Result<()> {
124        self.run.common.init_logging()?;
125
126        // We force cli errors before starting to listen for connections so then
127        // we don't accidentally delay them to the first request.
128
129        if self.run.common.wasi.nn == Some(true) {
130            #[cfg(not(feature = "wasi-nn"))]
131            {
132                bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
133            }
134        }
135
136        if self.run.common.wasi.threads == Some(true) {
137            bail!("wasi-threads does not support components yet")
138        }
139
140        // The serve command requires both wasi-http and the component model, so
141        // we enable those by default here.
142        if self.run.common.wasi.http.replace(true) == Some(false) {
143            bail!("wasi-http is required for the serve command, and must not be disabled");
144        }
145        if self.run.common.wasm.component_model.replace(true) == Some(false) {
146            bail!("components are required for the serve command, and must not be disabled");
147        }
148
149        let runtime = tokio::runtime::Builder::new_multi_thread()
150            .enable_time()
151            .enable_io()
152            .build()?;
153
154        runtime.block_on(self.serve())?;
155
156        Ok(())
157    }
158
159    fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
160        let mut builder = WasiCtxBuilder::new();
161        self.run.configure_wasip2(&mut builder)?;
162
163        builder.env("REQUEST_ID", req_id.to_string());
164
165        let stdout_prefix: String;
166        let stderr_prefix: String;
167        if self.no_logging_prefix {
168            stdout_prefix = "".to_string();
169            stderr_prefix = "".to_string();
170        } else {
171            stdout_prefix = format!("stdout [{req_id}] :: ");
172            stderr_prefix = format!("stderr [{req_id}] :: ");
173        }
174        builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
175        builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
176
177        let mut host = Host {
178            table: wasmtime::component::ResourceTable::new(),
179            ctx: builder.build(),
180            http: WasiHttpCtx::new(),
181            http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
182            http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
183
184            limits: StoreLimits::default(),
185
186            #[cfg(feature = "wasi-nn")]
187            nn: None,
188            #[cfg(feature = "wasi-config")]
189            wasi_config: None,
190            #[cfg(feature = "wasi-keyvalue")]
191            wasi_keyvalue: None,
192            #[cfg(feature = "profiling")]
193            guest_profiler: None,
194        };
195
196        if self.run.common.wasi.nn == Some(true) {
197            #[cfg(feature = "wasi-nn")]
198            {
199                let graphs = self
200                    .run
201                    .common
202                    .wasi
203                    .nn_graph
204                    .iter()
205                    .map(|g| (g.format.clone(), g.dir.clone()))
206                    .collect::<Vec<_>>();
207                let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
208                host.nn.replace(WasiNnCtx::new(backends, registry));
209            }
210        }
211
212        if self.run.common.wasi.config == Some(true) {
213            #[cfg(feature = "wasi-config")]
214            {
215                let vars = WasiConfigVariables::from_iter(
216                    self.run
217                        .common
218                        .wasi
219                        .config_var
220                        .iter()
221                        .map(|v| (v.key.clone(), v.value.clone())),
222                );
223                host.wasi_config.replace(vars);
224            }
225        }
226
227        if self.run.common.wasi.keyvalue == Some(true) {
228            #[cfg(feature = "wasi-keyvalue")]
229            {
230                let ctx = WasiKeyValueCtxBuilder::new()
231                    .in_memory_data(
232                        self.run
233                            .common
234                            .wasi
235                            .keyvalue_in_memory_data
236                            .iter()
237                            .map(|v| (v.key.clone(), v.value.clone())),
238                    )
239                    .build();
240                host.wasi_keyvalue.replace(ctx);
241            }
242        }
243
244        let mut store = Store::new(engine, host);
245
246        store.data_mut().limits = self.run.store_limits();
247        store.limiter(|t| &mut t.limits);
248
249        // If fuel has been configured, we want to add the configured
250        // fuel amount to this store.
251        if let Some(fuel) = self.run.common.wasm.fuel {
252            store.set_fuel(fuel)?;
253        }
254
255        Ok(store)
256    }
257
258    fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
259        let mut cli = self.run.common.wasi.cli;
260
261        // Accept -Scommon as a deprecated alias for -Scli.
262        if let Some(common) = self.run.common.wasi.common {
263            if cli.is_some() {
264                bail!(
265                    "The -Scommon option should not be use with -Scli as it is a deprecated alias"
266                );
267            } else {
268                // In the future, we may add a warning here to tell users to use
269                // `-S cli` instead of `-S common`.
270                cli = Some(common);
271            }
272        }
273
274        // Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
275        // to serve as a signal to enable all WASI interfaces instead of just
276        // those in the `proxy` world. If `-Scli` is present then add all
277        // `command` APIs and then additionally add in the required HTTP APIs.
278        //
279        // If `-Scli` isn't passed then use the `add_to_linker_async`
280        // bindings which adds just those interfaces that the proxy interface
281        // uses.
282        if cli == Some(true) {
283            let link_options = self.run.compute_wasi_features();
284            wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
285            wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
286        } else {
287            wasmtime_wasi_http::add_to_linker_async(linker)?;
288        }
289
290        if self.run.common.wasi.nn == Some(true) {
291            #[cfg(not(feature = "wasi-nn"))]
292            {
293                bail!("support for wasi-nn was disabled at compile time");
294            }
295            #[cfg(feature = "wasi-nn")]
296            {
297                wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
298                    let ctx = h.nn.as_mut().unwrap();
299                    wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
300                })?;
301            }
302        }
303
304        if self.run.common.wasi.config == Some(true) {
305            #[cfg(not(feature = "wasi-config"))]
306            {
307                bail!("support for wasi-config was disabled at compile time");
308            }
309            #[cfg(feature = "wasi-config")]
310            {
311                wasmtime_wasi_config::add_to_linker(linker, |h| {
312                    WasiConfig::from(h.wasi_config.as_ref().unwrap())
313                })?;
314            }
315        }
316
317        if self.run.common.wasi.keyvalue == Some(true) {
318            #[cfg(not(feature = "wasi-keyvalue"))]
319            {
320                bail!("support for wasi-keyvalue was disabled at compile time");
321            }
322            #[cfg(feature = "wasi-keyvalue")]
323            {
324                wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
325                    WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
326                })?;
327            }
328        }
329
330        if self.run.common.wasi.threads == Some(true) {
331            bail!("support for wasi-threads is not available with components");
332        }
333
334        if self.run.common.wasi.http == Some(false) {
335            bail!("support for wasi-http must be enabled for `serve` subcommand");
336        }
337
338        Ok(())
339    }
340
341    async fn serve(mut self) -> Result<()> {
342        use hyper::server::conn::http1;
343
344        let mut config = self
345            .run
346            .common
347            .config(use_pooling_allocator_by_default().unwrap_or(None))?;
348        config.wasm_component_model(true);
349        config.async_support(true);
350
351        if self.run.common.wasm.timeout.is_some() {
352            config.epoch_interruption(true);
353        }
354
355        match self.run.profile {
356            Some(Profile::Native(s)) => {
357                config.profiler(s);
358            }
359            Some(Profile::Guest { .. }) => {
360                config.epoch_interruption(true);
361            }
362            None => {}
363        }
364
365        let engine = Engine::new(&config)?;
366        let mut linker = Linker::new(&engine);
367
368        self.add_to_linker(&mut linker)?;
369
370        let component = match self.run.load_module(&engine, &self.component)? {
371            RunTarget::Core(_) => bail!("The serve command currently requires a component"),
372            RunTarget::Component(c) => c,
373        };
374
375        let instance = linker.instantiate_pre(&component)?;
376        let instance = ProxyPre::new(instance)?;
377
378        // Spawn background task(s) waiting for graceful shutdown signals. This
379        // always listens for ctrl-c but additionally can listen for a TCP
380        // connection to the specified address.
381        let shutdown = Arc::new(GracefulShutdown::default());
382        tokio::task::spawn({
383            let shutdown = shutdown.clone();
384            async move {
385                tokio::signal::ctrl_c().await.unwrap();
386                shutdown.requested.notify_one();
387            }
388        });
389        if let Some(addr) = self.shutdown_addr {
390            let listener = tokio::net::TcpListener::bind(addr).await?;
391            eprintln!(
392                "Listening for shutdown on tcp://{}/",
393                listener.local_addr()?
394            );
395            let shutdown = shutdown.clone();
396            tokio::task::spawn(async move {
397                let _ = listener.accept().await;
398                shutdown.requested.notify_one();
399            });
400        }
401
402        let socket = match &self.addr {
403            SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
404            SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
405        };
406        // Conditionally enable `SO_REUSEADDR` depending on the current
407        // platform. On Unix we want this to be able to rebind an address in
408        // the `TIME_WAIT` state which can happen then a server is killed with
409        // active TCP connections and then restarted. On Windows though if
410        // `SO_REUSEADDR` is specified then it enables multiple applications to
411        // bind the port at the same time which is not something we want. Hence
412        // this is conditionally set based on the platform (and deviates from
413        // Tokio's default from always-on).
414        socket.set_reuseaddr(!cfg!(windows))?;
415        socket.bind(self.addr)?;
416        let listener = socket.listen(100)?;
417
418        eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
419
420        log::info!("Listening on {}", self.addr);
421
422        let handler = ProxyHandler::new(self, engine, instance);
423
424        loop {
425            // Wait for a socket, but also "race" against shutdown to break out
426            // of this loop. Once the graceful shutdown signal is received then
427            // this loop exits immediately.
428            let (stream, _) = tokio::select! {
429                _ = shutdown.requested.notified() => break,
430                v = listener.accept() => v?,
431            };
432            let comp = component.clone();
433            let stream = TokioIo::new(stream);
434            let h = handler.clone();
435            let shutdown_guard = shutdown.clone().increment();
436            tokio::task::spawn(async move {
437                if let Err(e) = http1::Builder::new()
438                    .keep_alive(true)
439                    .serve_connection(
440                        stream,
441                        hyper::service::service_fn(move |req| {
442                            let comp = comp.clone();
443                            let h = h.clone();
444                            async move {
445                                use http_body_util::{BodyExt, Full};
446                                fn to_errorcode(_: Infallible) -> ErrorCode {
447                                    unreachable!()
448                                }
449                                match handle_request(h, req, comp).await {
450                                    Ok(r) => Ok::<_, Infallible>(r),
451                                    Err(e) => {
452                                        eprintln!("error: {e:?}");
453                                        let error_html = "\
454<!doctype html>
455<html>
456<head>
457    <title>500 Internal Server Error</title>
458</head>
459<body>
460    <center>
461        <h1>500 Internal Server Error</h1>
462        <hr>
463        wasmtime
464    </center>
465</body>
466</html>";
467                                        Ok(Response::builder()
468                                            .status(StatusCode::INTERNAL_SERVER_ERROR)
469                                            .header("Content-Type", "text/html; charset=UTF-8")
470                                            .body(
471                                                Full::new(bytes::Bytes::from(error_html))
472                                                    .map_err(to_errorcode)
473                                                    .boxed(),
474                                            )
475                                            .unwrap())
476                                    }
477                                }
478                            }
479                        }),
480                    )
481                    .await
482                {
483                    eprintln!("error: {e:?}");
484                }
485                drop(shutdown_guard);
486            });
487        }
488
489        // Upon exiting the loop we'll no longer process any more incoming
490        // connections but there may still be outstanding connections
491        // processing in child tasks. If there are wait for those to complete
492        // before shutting down completely. Also enable short-circuiting this
493        // wait with a second ctrl-c signal.
494        if shutdown.close() {
495            return Ok(());
496        }
497        eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
498        tokio::select! {
499            _ = tokio::signal::ctrl_c() => {}
500            _ = shutdown.complete.notified() => {}
501        }
502
503        Ok(())
504    }
505}
506
507/// Helper structure to manage graceful shutdown int he accept loop above.
508#[derive(Default)]
509struct GracefulShutdown {
510    /// Async notification that shutdown has been requested.
511    requested: Notify,
512    /// Async notification that shutdown has completed, signaled when
513    /// `notify_when_done` is `true` and `active_tasks` reaches 0.
514    complete: Notify,
515    /// Internal state related to what's in progress when shutdown is requested.
516    state: Mutex<GracefulShutdownState>,
517}
518
519#[derive(Default)]
520struct GracefulShutdownState {
521    active_tasks: u32,
522    notify_when_done: bool,
523}
524
525impl GracefulShutdown {
526    /// Increments the number of active tasks and returns a guard indicating
527    fn increment(self: Arc<Self>) -> impl Drop {
528        struct Guard(Arc<GracefulShutdown>);
529
530        let mut state = self.state.lock().unwrap();
531        assert!(!state.notify_when_done);
532        state.active_tasks += 1;
533        drop(state);
534
535        return Guard(self);
536
537        impl Drop for Guard {
538            fn drop(&mut self) {
539                let mut state = self.0.state.lock().unwrap();
540                state.active_tasks -= 1;
541                if state.notify_when_done && state.active_tasks == 0 {
542                    self.0.complete.notify_one();
543                }
544            }
545        }
546    }
547
548    /// Flags this state as done spawning tasks and returns whether there are no
549    /// more child tasks remaining.
550    fn close(&self) -> bool {
551        let mut state = self.state.lock().unwrap();
552        state.notify_when_done = true;
553        state.active_tasks == 0
554    }
555}
556
557/// When executing with a timeout enabled, this is how frequently epoch
558/// interrupts will be executed to check for timeouts. If guest profiling
559/// is enabled, the guest epoch period will be used.
560const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
561
562struct EpochThread {
563    shutdown: Arc<AtomicBool>,
564    handle: Option<std::thread::JoinHandle<()>>,
565}
566
567impl EpochThread {
568    fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
569        let shutdown = Arc::new(AtomicBool::new(false));
570        let handle = {
571            let shutdown = Arc::clone(&shutdown);
572            let handle = std::thread::spawn(move || {
573                while !shutdown.load(Ordering::Relaxed) {
574                    std::thread::sleep(interval);
575                    engine.increment_epoch();
576                }
577            });
578            Some(handle)
579        };
580
581        EpochThread { shutdown, handle }
582    }
583}
584
585impl Drop for EpochThread {
586    fn drop(&mut self) {
587        if let Some(handle) = self.handle.take() {
588            self.shutdown.store(true, Ordering::Relaxed);
589            handle.join().unwrap();
590        }
591    }
592}
593
594type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
595
596fn setup_epoch_handler(
597    cmd: &ServeCommand,
598    store: &mut Store<Host>,
599    component: Component,
600) -> Result<(WriteProfile, Option<EpochThread>)> {
601    // Profiling Enabled
602    if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
603        #[cfg(feature = "profiling")]
604        return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
605        #[cfg(not(feature = "profiling"))]
606        {
607            let _ = (path, interval);
608            bail!("support for profiling disabled at compile time!");
609        }
610    }
611
612    // Profiling disabled but there's a global request timeout
613    let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
614        let start = Instant::now();
615        store.epoch_deadline_callback(move |_store| {
616            if start.elapsed() > timeout {
617                bail!("Timeout expired");
618            }
619            Ok(UpdateDeadline::Continue(1))
620        });
621        store.set_epoch_deadline(1);
622        let engine = store.engine().clone();
623        Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
624    } else {
625        None
626    };
627
628    Ok((Box::new(|_store| {}), epoch_thread))
629}
630
631#[cfg(feature = "profiling")]
632fn setup_guest_profiler(
633    cmd: &ServeCommand,
634    store: &mut Store<Host>,
635    path: String,
636    interval: Duration,
637    component: Component,
638) -> Result<(WriteProfile, Option<EpochThread>)> {
639    use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
640
641    let module_name = "<main>";
642
643    store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
644        module_name,
645        interval,
646        component,
647        std::iter::empty(),
648    )));
649
650    fn sample(
651        mut store: StoreContextMut<Host>,
652        f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
653    ) {
654        let mut profiler = store.data_mut().guest_profiler.take().unwrap();
655        f(
656            Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
657            store.as_context(),
658        );
659        store.data_mut().guest_profiler = Some(profiler);
660    }
661
662    // Hostcall entry/exit, etc.
663    store.call_hook(|store, kind| {
664        sample(store, |profiler, store| profiler.call_hook(store, kind));
665        Ok(())
666    });
667
668    let start = Instant::now();
669    let timeout = cmd.run.common.wasm.timeout;
670    store.epoch_deadline_callback(move |store| {
671        sample(store, |profiler, store| {
672            profiler.sample(store, std::time::Duration::ZERO)
673        });
674
675        // Originally epoch counting was used here; this is problematic in
676        // a lot of cases due to there being a lot of time (e.g. in hostcalls)
677        // when we are not expected to get sample hits.
678        if let Some(timeout) = timeout {
679            if start.elapsed() > timeout {
680                bail!("Timeout expired");
681            }
682        }
683
684        Ok(UpdateDeadline::Continue(1))
685    });
686
687    store.set_epoch_deadline(1);
688    let engine = store.engine().clone();
689    let epoch_thread = Some(EpochThread::spawn(interval, engine));
690
691    let write_profile = Box::new(move |store: &mut Store<Host>| {
692        let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
693            .expect("profiling doesn't support threads yet");
694        if let Err(e) = std::fs::File::create(&path)
695            .map_err(anyhow::Error::new)
696            .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
697        {
698            eprintln!("failed writing profile at {path}: {e:#}");
699        } else {
700            eprintln!();
701            eprintln!("Profile written to: {path}");
702            eprintln!("View this profile at https://profiler.firefox.com/.");
703        }
704    });
705
706    Ok((write_profile, epoch_thread))
707}
708
709struct ProxyHandlerInner {
710    cmd: ServeCommand,
711    engine: Engine,
712    instance_pre: ProxyPre<Host>,
713    next_id: AtomicU64,
714}
715
716impl ProxyHandlerInner {
717    fn next_req_id(&self) -> u64 {
718        self.next_id.fetch_add(1, Ordering::Relaxed)
719    }
720}
721
722#[derive(Clone)]
723struct ProxyHandler(Arc<ProxyHandlerInner>);
724
725impl ProxyHandler {
726    fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
727        Self(Arc::new(ProxyHandlerInner {
728            cmd,
729            engine,
730            instance_pre,
731            next_id: AtomicU64::from(0),
732        }))
733    }
734}
735
736type Request = hyper::Request<hyper::body::Incoming>;
737
738async fn handle_request(
739    ProxyHandler(inner): ProxyHandler,
740    req: Request,
741    component: Component,
742) -> Result<hyper::Response<HyperOutgoingBody>> {
743    let (sender, receiver) = tokio::sync::oneshot::channel();
744
745    let req_id = inner.next_req_id();
746
747    log::info!(
748        "Request {req_id} handling {} to {}",
749        req.method(),
750        req.uri()
751    );
752
753    let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
754
755    let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
756    let out = store.data_mut().new_response_outparam(sender)?;
757    let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
758
759    let comp = component.clone();
760    let task = tokio::task::spawn(async move {
761        let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
762
763        if let Err(e) = proxy
764            .wasi_http_incoming_handler()
765            .call_handle(&mut store, req, out)
766            .await
767        {
768            log::error!("[{req_id}] :: {e:?}");
769            return Err(e);
770        }
771
772        write_profile(&mut store);
773        drop(epoch_thread);
774
775        Ok(())
776    });
777
778    let result = match receiver.await {
779        Ok(Ok(resp)) => Ok(resp),
780        Ok(Err(e)) => Err(e.into()),
781        Err(_) => {
782            // An error in the receiver (`RecvError`) only indicates that the
783            // task exited before a response was sent (i.e., the sender was
784            // dropped); it does not describe the underlying cause of failure.
785            // Instead we retrieve and propagate the error from inside the task
786            // which should more clearly tell the user what went wrong. Note
787            // that we assume the task has already exited at this point so the
788            // `await` should resolve immediately.
789            let e = match task.await {
790                Ok(Ok(())) => {
791                    bail!("guest never invoked `response-outparam::set` method")
792                }
793                Ok(Err(e)) => e,
794                Err(e) => e.into(),
795            };
796            Err(e.context("guest never invoked `response-outparam::set` method"))
797        }
798    };
799
800    result
801}
802
803#[derive(Clone)]
804enum Output {
805    Stdout,
806    Stderr,
807}
808
809impl Output {
810    fn write_all(&self, buf: &[u8]) -> io::Result<()> {
811        use std::io::Write;
812
813        match self {
814            Output::Stdout => std::io::stdout().write_all(buf),
815            Output::Stderr => std::io::stderr().write_all(buf),
816        }
817    }
818}
819
820#[derive(Clone)]
821struct LogStream {
822    output: Output,
823    state: Arc<LogStreamState>,
824}
825
826struct LogStreamState {
827    prefix: String,
828    needs_prefix_on_next_write: AtomicBool,
829}
830
831impl LogStream {
832    fn new(prefix: String, output: Output) -> LogStream {
833        LogStream {
834            output,
835            state: Arc::new(LogStreamState {
836                prefix,
837                needs_prefix_on_next_write: AtomicBool::new(true),
838            }),
839        }
840    }
841
842    fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
843        while !bytes.is_empty() {
844            if self
845                .state
846                .needs_prefix_on_next_write
847                .load(Ordering::Relaxed)
848            {
849                self.output.write_all(self.state.prefix.as_bytes())?;
850                self.state
851                    .needs_prefix_on_next_write
852                    .store(false, Ordering::Relaxed);
853            }
854            match bytes.iter().position(|b| *b == b'\n') {
855                Some(i) => {
856                    let (a, b) = bytes.split_at(i + 1);
857                    bytes = b;
858                    self.output.write_all(a)?;
859                    self.state
860                        .needs_prefix_on_next_write
861                        .store(true, Ordering::Relaxed);
862                }
863                None => {
864                    self.output.write_all(bytes)?;
865                    break;
866                }
867            }
868        }
869
870        Ok(())
871    }
872}
873
874impl wasmtime_wasi::cli::StdoutStream for LogStream {
875    fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
876        Box::new(self.clone())
877    }
878    fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
879        Box::new(self.clone())
880    }
881}
882
883impl wasmtime_wasi::cli::IsTerminal for LogStream {
884    fn is_terminal(&self) -> bool {
885        match &self.output {
886            Output::Stdout => std::io::stdout().is_terminal(),
887            Output::Stderr => std::io::stderr().is_terminal(),
888        }
889    }
890}
891
892impl wasmtime_wasi::p2::OutputStream for LogStream {
893    fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
894        self.write_all(&bytes)
895            .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
896        Ok(())
897    }
898
899    fn flush(&mut self) -> StreamResult<()> {
900        Ok(())
901    }
902
903    fn check_write(&mut self) -> StreamResult<usize> {
904        Ok(1024 * 1024)
905    }
906}
907
908#[async_trait::async_trait]
909impl wasmtime_wasi::p2::Pollable for LogStream {
910    async fn ready(&mut self) {}
911}
912
913impl AsyncWrite for LogStream {
914    fn poll_write(
915        mut self: Pin<&mut Self>,
916        _cx: &mut Context<'_>,
917        buf: &[u8],
918    ) -> Poll<io::Result<usize>> {
919        Poll::Ready(self.write_all(buf).map(|_| buf.len()))
920    }
921    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
922        Poll::Ready(Ok(()))
923    }
924    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
925        Poll::Ready(Ok(()))
926    }
927}
928
929/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
930/// try to use it when we can. The main cost of the pooling allocator, however,
931/// is the virtual memory required to run it. Not all systems support the same
932/// amount of virtual memory, for example some aarch64 and riscv64 configuration
933/// only support 39 bits of virtual address space.
934///
935/// The pooling allocator, by default, will request 1000 linear memories each
936/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
937/// being about 42 bits of the address space. This exceeds the 39 bit limit of
938/// some systems, so there the pooling allocator will fail by default.
939///
940/// This function attempts to dynamically determine the hint for the pooling
941/// allocator. This returns `Some(true)` if the pooling allocator should be used
942/// by default, or `None` or an error otherwise.
943///
944/// The method for testing this is to allocate a 0-sized 64-bit linear memory
945/// with a maximum size that's N bits large where we force all memories to be
946/// static. This should attempt to acquire N bits of the virtual address space.
947/// If successful that should mean that the pooling allocator is OK to use, but
948/// if it fails then the pooling allocator is not used and the normal mmap-based
949/// implementation is used instead.
950fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
951    use wasmtime::{Config, Memory, MemoryType};
952    const BITS_TO_TEST: u32 = 42;
953    let mut config = Config::new();
954    config.wasm_memory64(true);
955    config.memory_reservation(1 << BITS_TO_TEST);
956    let engine = Engine::new(&config)?;
957    let mut store = Store::new(&engine, ());
958    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
959    // page size here from the maximum size.
960    let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
961    if Memory::new(&mut store, ty).is_ok() {
962        Ok(Some(true))
963    } else {
964        Ok(None)
965    }
966}