1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Context as _, Result, bail};
3use bytes::Bytes;
4use clap::Parser;
5use futures::future::FutureExt;
6use http::{Response, StatusCode};
7use http_body_util::BodyExt as _;
8use http_body_util::combinators::UnsyncBoxBody;
9use std::convert::Infallible;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::{
14 path::PathBuf,
15 sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker, ResourceTable};
24use wasmtime::{Engine, Store, StoreContextMut, StoreLimits, UpdateDeadline};
25use wasmtime_cli_flags::opt::WasmtimeOptionValue;
26use wasmtime_wasi::p2::{StreamError, StreamResult};
27use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
28#[cfg(feature = "component-model-async")]
29use wasmtime_wasi_http::handler::p2::bindings as p2;
30use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
31use wasmtime_wasi_http::io::TokioIo;
32use wasmtime_wasi_http::{
33 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
34 WasiHttpView,
35};
36
37#[cfg(feature = "wasi-config")]
38use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
39#[cfg(feature = "wasi-keyvalue")]
40use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
41#[cfg(feature = "wasi-nn")]
42use wasmtime_wasi_nn::wit::WasiNnCtx;
43
44const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
45const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
46const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
47
48struct Host {
49 table: wasmtime::component::ResourceTable,
50 ctx: WasiCtx,
51 http: WasiHttpCtx,
52 http_outgoing_body_buffer_chunks: Option<usize>,
53 http_outgoing_body_chunk_size: Option<usize>,
54
55 #[cfg(feature = "component-model-async")]
56 p3_http: crate::common::DefaultP3Ctx,
57
58 limits: StoreLimits,
59
60 #[cfg(feature = "wasi-nn")]
61 nn: Option<WasiNnCtx>,
62
63 #[cfg(feature = "wasi-config")]
64 wasi_config: Option<WasiConfigVariables>,
65
66 #[cfg(feature = "wasi-keyvalue")]
67 wasi_keyvalue: Option<WasiKeyValueCtx>,
68
69 #[cfg(feature = "profiling")]
70 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
71}
72
73impl WasiView for Host {
74 fn ctx(&mut self) -> WasiCtxView<'_> {
75 WasiCtxView {
76 ctx: &mut self.ctx,
77 table: &mut self.table,
78 }
79 }
80}
81
82impl WasiHttpView for Host {
83 fn ctx(&mut self) -> &mut WasiHttpCtx {
84 &mut self.http
85 }
86 fn table(&mut self) -> &mut ResourceTable {
87 &mut self.table
88 }
89
90 fn outgoing_body_buffer_chunks(&mut self) -> usize {
91 self.http_outgoing_body_buffer_chunks
92 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
93 }
94
95 fn outgoing_body_chunk_size(&mut self) -> usize {
96 self.http_outgoing_body_chunk_size
97 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
98 }
99}
100
101#[cfg(feature = "component-model-async")]
102impl wasmtime_wasi_http::p3::WasiHttpView for Host {
103 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
104 wasmtime_wasi_http::p3::WasiHttpCtxView {
105 table: &mut self.table,
106 ctx: &mut self.p3_http,
107 }
108 }
109}
110
111const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
112 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
113 8080,
114);
115
116fn parse_duration(s: &str) -> Result<Duration, String> {
117 Duration::parse(Some(s)).map_err(|e| e.to_string())
118}
119
120#[derive(Parser)]
122pub struct ServeCommand {
123 #[command(flatten)]
124 run: RunCommon,
125
126 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
128 addr: SocketAddr,
129
130 #[arg(long, value_name = "SOCKADDR")]
135 shutdown_addr: Option<SocketAddr>,
136
137 #[arg(long)]
140 no_logging_prefix: bool,
141
142 #[arg(value_name = "WASM", required = true)]
144 component: PathBuf,
145
146 #[arg(long)]
151 max_instance_reuse_count: Option<usize>,
152
153 #[arg(long)]
160 max_instance_concurrent_reuse_count: Option<usize>,
161
162 #[arg(long, default_value = "1s", value_parser = parse_duration)]
169 idle_instance_timeout: Duration,
170}
171
172impl ServeCommand {
173 pub fn execute(mut self) -> Result<()> {
175 self.run.common.init_logging()?;
176
177 if self.run.common.wasi.nn == Some(true) {
181 #[cfg(not(feature = "wasi-nn"))]
182 {
183 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
184 }
185 }
186
187 if self.run.common.wasi.threads == Some(true) {
188 bail!("wasi-threads does not support components yet")
189 }
190
191 if self.run.common.wasi.http.replace(true) == Some(false) {
194 bail!("wasi-http is required for the serve command, and must not be disabled");
195 }
196 if self.run.common.wasm.component_model.replace(true) == Some(false) {
197 bail!("components are required for the serve command, and must not be disabled");
198 }
199
200 let runtime = tokio::runtime::Builder::new_multi_thread()
201 .enable_time()
202 .enable_io()
203 .build()?;
204
205 runtime.block_on(self.serve())?;
206
207 Ok(())
208 }
209
210 fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
211 let mut builder = WasiCtxBuilder::new();
212 self.run.configure_wasip2(&mut builder)?;
213
214 if let Some(req_id) = req_id {
215 builder.env("REQUEST_ID", req_id.to_string());
216 }
217
218 let stdout_prefix: String;
219 let stderr_prefix: String;
220 match req_id {
221 Some(req_id) if !self.no_logging_prefix => {
222 stdout_prefix = format!("stdout [{req_id}] :: ");
223 stderr_prefix = format!("stderr [{req_id}] :: ");
224 }
225 _ => {
226 stdout_prefix = "".to_string();
227 stderr_prefix = "".to_string();
228 }
229 }
230 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
231 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
232
233 let mut table = wasmtime::component::ResourceTable::new();
234 if let Some(max) = self.run.common.wasi.max_resources {
235 table.set_max_capacity(max);
236 }
237 let mut host = Host {
238 table,
239 ctx: builder.build(),
240 http: self.run.wasi_http_ctx()?,
241 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
242 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
243
244 limits: StoreLimits::default(),
245
246 #[cfg(feature = "wasi-nn")]
247 nn: None,
248 #[cfg(feature = "wasi-config")]
249 wasi_config: None,
250 #[cfg(feature = "wasi-keyvalue")]
251 wasi_keyvalue: None,
252 #[cfg(feature = "profiling")]
253 guest_profiler: None,
254 #[cfg(feature = "component-model-async")]
255 p3_http: crate::common::DefaultP3Ctx,
256 };
257
258 if self.run.common.wasi.nn == Some(true) {
259 #[cfg(feature = "wasi-nn")]
260 {
261 let graphs = self
262 .run
263 .common
264 .wasi
265 .nn_graph
266 .iter()
267 .map(|g| (g.format.clone(), g.dir.clone()))
268 .collect::<Vec<_>>();
269 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
270 host.nn.replace(WasiNnCtx::new(backends, registry));
271 }
272 }
273
274 if self.run.common.wasi.config == Some(true) {
275 #[cfg(feature = "wasi-config")]
276 {
277 let vars = WasiConfigVariables::from_iter(
278 self.run
279 .common
280 .wasi
281 .config_var
282 .iter()
283 .map(|v| (v.key.clone(), v.value.clone())),
284 );
285 host.wasi_config.replace(vars);
286 }
287 }
288
289 if self.run.common.wasi.keyvalue == Some(true) {
290 #[cfg(feature = "wasi-keyvalue")]
291 {
292 let ctx = WasiKeyValueCtxBuilder::new()
293 .in_memory_data(
294 self.run
295 .common
296 .wasi
297 .keyvalue_in_memory_data
298 .iter()
299 .map(|v| (v.key.clone(), v.value.clone())),
300 )
301 .build();
302 host.wasi_keyvalue.replace(ctx);
303 }
304 }
305
306 let mut store = Store::new(engine, host);
307
308 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
309 store.set_hostcall_fuel(fuel);
310 }
311
312 store.data_mut().limits = self.run.store_limits();
313 store.limiter(|t| &mut t.limits);
314
315 if let Some(fuel) = self.run.common.wasm.fuel {
318 store.set_fuel(fuel)?;
319 }
320
321 Ok(store)
322 }
323
324 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
325 self.run.validate_p3_option()?;
326 let cli = self.run.validate_cli_enabled()?;
327
328 if cli == Some(true) {
337 self.run.add_wasmtime_wasi_to_linker(linker)?;
338 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
339 #[cfg(feature = "component-model-async")]
340 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
341 wasmtime_wasi_http::p3::add_to_linker(linker)?;
342 }
343 } else {
344 wasmtime_wasi_http::add_to_linker_async(linker)?;
345 #[cfg(feature = "component-model-async")]
346 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
347 wasmtime_wasi_http::p3::add_to_linker(linker)?;
348 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
349 wasmtime_wasi::p3::random::add_to_linker(linker)?;
350 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
351 }
352 }
353
354 if self.run.common.wasi.nn == Some(true) {
355 #[cfg(not(feature = "wasi-nn"))]
356 {
357 bail!("support for wasi-nn was disabled at compile time");
358 }
359 #[cfg(feature = "wasi-nn")]
360 {
361 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
362 let ctx = h.nn.as_mut().unwrap();
363 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
364 })?;
365 }
366 }
367
368 if self.run.common.wasi.config == Some(true) {
369 #[cfg(not(feature = "wasi-config"))]
370 {
371 bail!("support for wasi-config was disabled at compile time");
372 }
373 #[cfg(feature = "wasi-config")]
374 {
375 wasmtime_wasi_config::add_to_linker(linker, |h| {
376 WasiConfig::from(h.wasi_config.as_ref().unwrap())
377 })?;
378 }
379 }
380
381 if self.run.common.wasi.keyvalue == Some(true) {
382 #[cfg(not(feature = "wasi-keyvalue"))]
383 {
384 bail!("support for wasi-keyvalue was disabled at compile time");
385 }
386 #[cfg(feature = "wasi-keyvalue")]
387 {
388 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
389 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
390 })?;
391 }
392 }
393
394 if self.run.common.wasi.threads == Some(true) {
395 bail!("support for wasi-threads is not available with components");
396 }
397
398 if self.run.common.wasi.http == Some(false) {
399 bail!("support for wasi-http must be enabled for `serve` subcommand");
400 }
401
402 Ok(())
403 }
404
405 async fn serve(mut self) -> Result<()> {
406 use hyper::server::conn::http1;
407
408 let mut config = self
409 .run
410 .common
411 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
412 config.wasm_component_model(true);
413 config.async_support(true);
414
415 if self.run.common.wasm.timeout.is_some() {
416 config.epoch_interruption(true);
417 }
418
419 match self.run.profile {
420 Some(Profile::Native(s)) => {
421 config.profiler(s);
422 }
423 Some(Profile::Guest { .. }) => {
424 config.epoch_interruption(true);
425 }
426 None => {}
427 }
428
429 let engine = Engine::new(&config)?;
430 let mut linker = Linker::new(&engine);
431
432 self.add_to_linker(&mut linker)?;
433
434 let component = match self.run.load_module(&engine, &self.component)? {
435 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
436 RunTarget::Component(c) => c,
437 };
438
439 let instance = linker.instantiate_pre(&component)?;
440 #[cfg(feature = "component-model-async")]
441 let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
442 Ok(pre) => ProxyPre::P3(pre),
443 Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
444 };
445 #[cfg(not(feature = "component-model-async"))]
446 let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
447
448 let shutdown = Arc::new(GracefulShutdown::default());
452 tokio::task::spawn({
453 let shutdown = shutdown.clone();
454 async move {
455 tokio::signal::ctrl_c().await.unwrap();
456 shutdown.requested.notify_one();
457 }
458 });
459 if let Some(addr) = self.shutdown_addr {
460 let listener = tokio::net::TcpListener::bind(addr).await?;
461 eprintln!(
462 "Listening for shutdown on tcp://{}/",
463 listener.local_addr()?
464 );
465 let shutdown = shutdown.clone();
466 tokio::task::spawn(async move {
467 let _ = listener.accept().await;
468 shutdown.requested.notify_one();
469 });
470 }
471
472 let socket = match &self.addr {
473 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
474 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
475 };
476 socket.set_reuseaddr(!cfg!(windows))?;
485 socket.bind(self.addr)?;
486 let listener = socket.listen(100)?;
487
488 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
489
490 log::info!("Listening on {}", self.addr);
491
492 let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
493 Some(interval)
494 } else if let Some(t) = self.run.common.wasm.timeout {
495 Some(EPOCH_INTERRUPT_PERIOD.min(t))
496 } else {
497 None
498 };
499 let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
500
501 let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
502 if let ProxyPre::P3(_) = &instance {
503 DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
504 } else {
505 DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
506 }
507 });
508
509 let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
510 self.max_instance_concurrent_reuse_count
511 .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
512 } else {
513 1
514 };
515
516 let handler = ProxyHandler::new(
517 HostHandlerState {
518 cmd: self,
519 engine,
520 component,
521 max_instance_reuse_count,
522 max_instance_concurrent_reuse_count,
523 },
524 instance,
525 );
526
527 loop {
528 let (stream, _) = tokio::select! {
532 _ = shutdown.requested.notified() => break,
533 v = listener.accept() => v?,
534 };
535
536 stream.set_nodelay(true)?;
542
543 let stream = TokioIo::new(stream);
544 let h = handler.clone();
545 let shutdown_guard = shutdown.clone().increment();
546 tokio::task::spawn(async move {
547 if let Err(e) = http1::Builder::new()
548 .keep_alive(true)
549 .serve_connection(
550 stream,
551 hyper::service::service_fn(move |req| {
552 let h = h.clone();
553 async move {
554 use http_body_util::{BodyExt, Full};
555 match handle_request(h, req).await {
556 Ok(r) => Ok::<_, Infallible>(r),
557 Err(e) => {
558 eprintln!("error: {e:?}");
559 let error_html = "\
560<!doctype html>
561<html>
562<head>
563 <title>500 Internal Server Error</title>
564</head>
565<body>
566 <center>
567 <h1>500 Internal Server Error</h1>
568 <hr>
569 wasmtime
570 </center>
571</body>
572</html>";
573 Ok(Response::builder()
574 .status(StatusCode::INTERNAL_SERVER_ERROR)
575 .header("Content-Type", "text/html; charset=UTF-8")
576 .body(
577 Full::new(bytes::Bytes::from(error_html))
578 .map_err(|_| unreachable!())
579 .boxed_unsync(),
580 )
581 .unwrap())
582 }
583 }
584 }
585 }),
586 )
587 .await
588 {
589 eprintln!("error: {e:?}");
590 }
591 drop(shutdown_guard);
592 });
593 }
594
595 if shutdown.close() {
601 return Ok(());
602 }
603 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
604 tokio::select! {
605 _ = tokio::signal::ctrl_c() => {}
606 _ = shutdown.complete.notified() => {}
607 }
608
609 Ok(())
610 }
611}
612
613struct HostHandlerState {
614 cmd: ServeCommand,
615 engine: Engine,
616 component: Component,
617 max_instance_reuse_count: usize,
618 max_instance_concurrent_reuse_count: usize,
619}
620
621impl HandlerState for HostHandlerState {
622 type StoreData = Host;
623
624 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
625 let mut store = self.cmd.new_store(&self.engine, req_id)?;
626 let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
627
628 Ok(StoreBundle {
629 store,
630 write_profile,
631 })
632 }
633
634 fn request_timeout(&self) -> Duration {
635 self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
636 }
637
638 fn idle_instance_timeout(&self) -> Duration {
639 self.cmd.idle_instance_timeout
640 }
641
642 fn max_instance_reuse_count(&self) -> usize {
643 self.max_instance_reuse_count
644 }
645
646 fn max_instance_concurrent_reuse_count(&self) -> usize {
647 self.max_instance_concurrent_reuse_count
648 }
649
650 fn handle_worker_error(&self, error: anyhow::Error) {
651 eprintln!("worker error: {error}");
652 }
653}
654
655#[derive(Default)]
657struct GracefulShutdown {
658 requested: Notify,
660 complete: Notify,
663 state: Mutex<GracefulShutdownState>,
665}
666
667#[derive(Default)]
668struct GracefulShutdownState {
669 active_tasks: u32,
670 notify_when_done: bool,
671}
672
673impl GracefulShutdown {
674 fn increment(self: Arc<Self>) -> impl Drop {
676 struct Guard(Arc<GracefulShutdown>);
677
678 let mut state = self.state.lock().unwrap();
679 assert!(!state.notify_when_done);
680 state.active_tasks += 1;
681 drop(state);
682
683 return Guard(self);
684
685 impl Drop for Guard {
686 fn drop(&mut self) {
687 let mut state = self.0.state.lock().unwrap();
688 state.active_tasks -= 1;
689 if state.notify_when_done && state.active_tasks == 0 {
690 self.0.complete.notify_one();
691 }
692 }
693 }
694 }
695
696 fn close(&self) -> bool {
699 let mut state = self.state.lock().unwrap();
700 state.notify_when_done = true;
701 state.active_tasks == 0
702 }
703}
704
705const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
709
710struct EpochThread {
711 shutdown: Arc<AtomicBool>,
712 handle: Option<std::thread::JoinHandle<()>>,
713}
714
715impl EpochThread {
716 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
717 let shutdown = Arc::new(AtomicBool::new(false));
718 let handle = {
719 let shutdown = Arc::clone(&shutdown);
720 let handle = std::thread::spawn(move || {
721 while !shutdown.load(Ordering::Relaxed) {
722 std::thread::sleep(interval);
723 engine.increment_epoch();
724 }
725 });
726 Some(handle)
727 };
728
729 EpochThread { shutdown, handle }
730 }
731}
732
733impl Drop for EpochThread {
734 fn drop(&mut self) {
735 if let Some(handle) = self.handle.take() {
736 self.shutdown.store(true, Ordering::Relaxed);
737 handle.join().unwrap();
738 }
739 }
740}
741
742type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
743
744fn setup_epoch_handler(
745 cmd: &ServeCommand,
746 store: &mut Store<Host>,
747 component: Component,
748) -> Result<WriteProfile> {
749 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
751 #[cfg(feature = "profiling")]
752 return setup_guest_profiler(store, path.clone(), *interval, component.clone());
753 #[cfg(not(feature = "profiling"))]
754 {
755 let _ = (path, interval);
756 bail!("support for profiling disabled at compile time!");
757 }
758 }
759
760 if cmd.run.common.wasm.timeout.is_some() {
762 store.epoch_deadline_async_yield_and_update(1);
763 }
764
765 Ok(Box::new(|_store| {}))
766}
767
768#[cfg(feature = "profiling")]
769fn setup_guest_profiler(
770 store: &mut Store<Host>,
771 path: String,
772 interval: Duration,
773 component: Component,
774) -> Result<WriteProfile> {
775 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
776
777 let module_name = "<main>";
778
779 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
780 store.engine(),
781 module_name,
782 interval,
783 component,
784 std::iter::empty(),
785 )?));
786
787 fn sample(
788 mut store: StoreContextMut<Host>,
789 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
790 ) {
791 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
792 f(
793 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
794 store.as_context(),
795 );
796 store.data_mut().guest_profiler = Some(profiler);
797 }
798
799 store.call_hook(|store, kind| {
801 sample(store, |profiler, store| profiler.call_hook(store, kind));
802 Ok(())
803 });
804
805 store.epoch_deadline_callback(move |store| {
806 sample(store, |profiler, store| {
807 profiler.sample(store, std::time::Duration::ZERO)
808 });
809
810 Ok(UpdateDeadline::Continue(1))
811 });
812
813 store.set_epoch_deadline(1);
814
815 let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
816 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
817 .expect("profiling doesn't support threads yet");
818 if let Err(e) = std::fs::File::create(&path)
819 .map_err(anyhow::Error::new)
820 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
821 {
822 eprintln!("failed writing profile at {path}: {e:#}");
823 } else {
824 eprintln!();
825 eprintln!("Profile written to: {path}");
826 eprintln!("View this profile at https://profiler.firefox.com/.");
827 }
828 });
829
830 Ok(write_profile)
831}
832
833type Request = hyper::Request<hyper::body::Incoming>;
834
835async fn handle_request(
836 handler: ProxyHandler<HostHandlerState>,
837 req: Request,
838) -> Result<hyper::Response<UnsyncBoxBody<Bytes, anyhow::Error>>> {
839 use tokio::sync::oneshot;
840
841 let req_id = handler.next_req_id();
842
843 log::info!(
844 "Request {req_id} handling {} to {}",
845 req.method(),
846 req.uri()
847 );
848
849 type P2Response = Result<
856 hyper::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
857 p2::http::types::ErrorCode,
858 >;
859 type P3Response = hyper::Response<UnsyncBoxBody<Bytes, anyhow::Error>>;
860
861 enum Sender {
862 P2(oneshot::Sender<P2Response>),
863 P3(oneshot::Sender<P3Response>),
864 }
865
866 enum Receiver {
867 P2(oneshot::Receiver<P2Response>),
868 P3(oneshot::Receiver<P3Response>),
869 }
870
871 let (tx, rx) = match handler.instance_pre() {
872 ProxyPre::P2(_) => {
873 let (tx, rx) = oneshot::channel();
874 (Sender::P2(tx), Receiver::P2(rx))
875 }
876 ProxyPre::P3(_) => {
877 let (tx, rx) = oneshot::channel();
878 (Sender::P3(tx), Receiver::P3(rx))
879 }
880 };
881
882 handler.spawn(
883 if handler.state().max_instance_reuse_count() == 1 {
884 Some(req_id)
885 } else {
886 None
887 },
888 Box::new(move |store, proxy| {
889 Box::pin(
890 async move {
891 match proxy {
892 Proxy::P2(proxy) => {
893 let Sender::P2(tx) = tx else { unreachable!() };
894 let (req, out) = store.with(move |mut store| {
895 let req = store
896 .data_mut()
897 .new_incoming_request(p2::http::types::Scheme::Http, req)?;
898 let out = store.data_mut().new_response_outparam(tx)?;
899 anyhow::Ok((req, out))
900 })?;
901
902 proxy
903 .wasi_http_incoming_handler()
904 .call_handle(store, req, out)
905 .await
906 }
907 Proxy::P3(proxy) => {
908 use wasmtime_wasi_http::p3::bindings::http::types::{
909 ErrorCode, Request,
910 };
911
912 let Sender::P3(tx) = tx else { unreachable!() };
913 let (req, body) = req.into_parts();
914 let body = body.map_err(ErrorCode::from_hyper_request_error);
915 let req = http::Request::from_parts(req, body);
916 let (request, request_io_result) = Request::from_http(req);
917 let (res, task) = proxy.handle(store, request).await??;
918 let res = store
919 .with(|mut store| res.into_http(&mut store, request_io_result))?;
920 _ = tx.send(res.map(|body| body.map_err(|e| e.into()).boxed_unsync()));
921
922 task.block(store).await;
924 Ok(())
925 }
926 }
927 }
928 .map(move |result| {
929 if let Err(error) = result {
930 eprintln!("[{req_id}] :: {error:?}");
931 }
932 }),
933 )
934 }),
935 );
936
937 Ok(match rx {
938 Receiver::P2(rx) => rx
939 .await
940 .context("guest never invoked `response-outparam::set` method")?
941 .map_err(|e| anyhow::Error::from(e))?
942 .map(|body| body.map_err(|e| e.into()).boxed_unsync()),
943 Receiver::P3(rx) => rx.await?,
944 })
945}
946
947#[derive(Clone)]
948enum Output {
949 Stdout,
950 Stderr,
951}
952
953impl Output {
954 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
955 use std::io::Write;
956
957 match self {
958 Output::Stdout => std::io::stdout().write_all(buf),
959 Output::Stderr => std::io::stderr().write_all(buf),
960 }
961 }
962}
963
964#[derive(Clone)]
965struct LogStream {
966 output: Output,
967 state: Arc<LogStreamState>,
968}
969
970struct LogStreamState {
971 prefix: String,
972 needs_prefix_on_next_write: AtomicBool,
973}
974
975impl LogStream {
976 fn new(prefix: String, output: Output) -> LogStream {
977 LogStream {
978 output,
979 state: Arc::new(LogStreamState {
980 prefix,
981 needs_prefix_on_next_write: AtomicBool::new(true),
982 }),
983 }
984 }
985
986 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
987 while !bytes.is_empty() {
988 if self
989 .state
990 .needs_prefix_on_next_write
991 .load(Ordering::Relaxed)
992 {
993 self.output.write_all(self.state.prefix.as_bytes())?;
994 self.state
995 .needs_prefix_on_next_write
996 .store(false, Ordering::Relaxed);
997 }
998 match bytes.iter().position(|b| *b == b'\n') {
999 Some(i) => {
1000 let (a, b) = bytes.split_at(i + 1);
1001 bytes = b;
1002 self.output.write_all(a)?;
1003 self.state
1004 .needs_prefix_on_next_write
1005 .store(true, Ordering::Relaxed);
1006 }
1007 None => {
1008 self.output.write_all(bytes)?;
1009 break;
1010 }
1011 }
1012 }
1013
1014 Ok(())
1015 }
1016}
1017
1018impl wasmtime_wasi::cli::StdoutStream for LogStream {
1019 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1020 Box::new(self.clone())
1021 }
1022 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1023 Box::new(self.clone())
1024 }
1025}
1026
1027impl wasmtime_wasi::cli::IsTerminal for LogStream {
1028 fn is_terminal(&self) -> bool {
1029 match &self.output {
1030 Output::Stdout => std::io::stdout().is_terminal(),
1031 Output::Stderr => std::io::stderr().is_terminal(),
1032 }
1033 }
1034}
1035
1036impl wasmtime_wasi::p2::OutputStream for LogStream {
1037 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1038 self.write_all(&bytes)
1039 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1040 Ok(())
1041 }
1042
1043 fn flush(&mut self) -> StreamResult<()> {
1044 Ok(())
1045 }
1046
1047 fn check_write(&mut self) -> StreamResult<usize> {
1048 Ok(1024 * 1024)
1049 }
1050}
1051
1052#[async_trait::async_trait]
1053impl wasmtime_wasi::p2::Pollable for LogStream {
1054 async fn ready(&mut self) {}
1055}
1056
1057impl AsyncWrite for LogStream {
1058 fn poll_write(
1059 mut self: Pin<&mut Self>,
1060 _cx: &mut Context<'_>,
1061 buf: &[u8],
1062 ) -> Poll<io::Result<usize>> {
1063 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1064 }
1065 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1066 Poll::Ready(Ok(()))
1067 }
1068 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1069 Poll::Ready(Ok(()))
1070 }
1071}
1072
1073fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1095 use wasmtime::{Config, Memory, MemoryType};
1096 const BITS_TO_TEST: u32 = 42;
1097 let mut config = Config::new();
1098 config.wasm_memory64(true);
1099 config.memory_reservation(1 << BITS_TO_TEST);
1100 let engine = Engine::new(&config)?;
1101 let mut store = Store::new(&engine, ());
1102 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1105 if Memory::new(&mut store, ty).is_ok() {
1106 Ok(Some(true))
1107 } else {
1108 Ok(None)
1109 }
1110}