1use crate::common::{HttpHooks, Profile, RunCommon, RunTarget};
2use bytes::Bytes;
3use clap::Parser;
4use futures::future::FutureExt;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::UnsyncBoxBody;
8use hyper::body::{Body, Frame, SizeHint};
9use std::convert::Infallible;
10use std::net::SocketAddr;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::{
14 path::PathBuf,
15 sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, Ordering},
18 },
19 time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker};
24use wasmtime::{
25 Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail, error::Context as _,
26};
27use wasmtime_cli_flags::opt::WasmtimeOptionValue;
28use wasmtime_wasi::p2::{StreamError, StreamResult};
29use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
30#[cfg(feature = "component-model-async")]
31use wasmtime_wasi_http::handler::p2::bindings as p2;
32use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
33use wasmtime_wasi_http::io::TokioIo;
34use wasmtime_wasi_http::{WasiHttpCtx, p2::WasiHttpView};
35
36#[cfg(feature = "wasi-config")]
37use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
38#[cfg(feature = "wasi-keyvalue")]
39use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
40#[cfg(feature = "wasi-nn")]
41use wasmtime_wasi_nn::wit::WasiNnCtx;
42
43const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
44const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
45const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
46
47struct Host {
48 table: wasmtime::component::ResourceTable,
49 ctx: WasiCtx,
50 http: WasiHttpCtx,
51 hooks: HttpHooks,
52
53 limits: StoreLimits,
54
55 #[cfg(feature = "wasi-nn")]
56 nn: Option<WasiNnCtx>,
57
58 #[cfg(feature = "wasi-config")]
59 wasi_config: Option<WasiConfigVariables>,
60
61 #[cfg(feature = "wasi-keyvalue")]
62 wasi_keyvalue: Option<WasiKeyValueCtx>,
63
64 #[cfg(feature = "profiling")]
65 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
66}
67
68impl WasiView for Host {
69 fn ctx(&mut self) -> WasiCtxView<'_> {
70 WasiCtxView {
71 ctx: &mut self.ctx,
72 table: &mut self.table,
73 }
74 }
75}
76
77impl wasmtime_wasi_http::p2::WasiHttpView for Host {
78 fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
79 wasmtime_wasi_http::p2::WasiHttpCtxView {
80 ctx: &mut self.http,
81 table: &mut self.table,
82 hooks: &mut self.hooks,
83 }
84 }
85}
86
87#[cfg(feature = "component-model-async")]
88impl wasmtime_wasi_http::p3::WasiHttpView for Host {
89 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
90 wasmtime_wasi_http::p3::WasiHttpCtxView {
91 table: &mut self.table,
92 ctx: &mut self.http,
93 hooks: &mut self.hooks,
94 }
95 }
96}
97
98const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
99 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
100 8080,
101);
102
103fn parse_duration(s: &str) -> Result<Duration, String> {
104 Duration::parse(Some(s)).map_err(|e| e.to_string())
105}
106
107#[derive(Parser)]
109pub struct ServeCommand {
110 #[command(flatten)]
111 run: RunCommon,
112
113 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
115 addr: SocketAddr,
116
117 #[arg(long, value_name = "SOCKADDR")]
122 shutdown_addr: Option<SocketAddr>,
123
124 #[arg(long)]
127 no_logging_prefix: bool,
128
129 #[arg(value_name = "WASM", required = true)]
131 component: PathBuf,
132
133 #[arg(long)]
138 max_instance_reuse_count: Option<usize>,
139
140 #[arg(long)]
147 max_instance_concurrent_reuse_count: Option<usize>,
148
149 #[arg(long, default_value = "1s", value_parser = parse_duration)]
156 idle_instance_timeout: Duration,
157}
158
159impl ServeCommand {
160 pub fn execute(mut self) -> Result<()> {
162 self.run.common.init_logging()?;
163
164 if self.run.common.wasi.nn == Some(true) {
168 #[cfg(not(feature = "wasi-nn"))]
169 {
170 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
171 }
172 }
173
174 if self.run.common.wasi.threads == Some(true) {
175 bail!("wasi-threads does not support components yet")
176 }
177
178 if self.run.common.wasi.http.replace(true) == Some(false) {
181 bail!("wasi-http is required for the serve command, and must not be disabled");
182 }
183 if self.run.common.wasm.component_model.replace(true) == Some(false) {
184 bail!("components are required for the serve command, and must not be disabled");
185 }
186
187 let runtime = tokio::runtime::Builder::new_multi_thread()
188 .enable_time()
189 .enable_io()
190 .build()?;
191
192 runtime.block_on(self.serve())?;
193
194 Ok(())
195 }
196
197 fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
198 let mut builder = WasiCtxBuilder::new();
199 self.run.configure_wasip2(&mut builder)?;
200
201 if let Some(req_id) = req_id {
202 builder.env("REQUEST_ID", req_id.to_string());
203 }
204
205 let stdout_prefix: String;
206 let stderr_prefix: String;
207 match req_id {
208 Some(req_id) if !self.no_logging_prefix => {
209 stdout_prefix = format!("stdout [{req_id}] :: ");
210 stderr_prefix = format!("stderr [{req_id}] :: ");
211 }
212 _ => {
213 stdout_prefix = "".to_string();
214 stderr_prefix = "".to_string();
215 }
216 }
217 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
218 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
219
220 let mut table = wasmtime::component::ResourceTable::new();
221 if let Some(max) = self.run.common.wasi.max_resources {
222 table.set_max_capacity(max);
223 }
224 let mut host = Host {
225 table,
226 ctx: builder.build(),
227 http: self.run.wasi_http_ctx()?,
228 hooks: self.run.wasi_http_hooks(),
229
230 limits: StoreLimits::default(),
231
232 #[cfg(feature = "wasi-nn")]
233 nn: None,
234 #[cfg(feature = "wasi-config")]
235 wasi_config: None,
236 #[cfg(feature = "wasi-keyvalue")]
237 wasi_keyvalue: None,
238 #[cfg(feature = "profiling")]
239 guest_profiler: None,
240 };
241
242 if self.run.common.wasi.nn == Some(true) {
243 #[cfg(feature = "wasi-nn")]
244 {
245 let graphs = self
246 .run
247 .common
248 .wasi
249 .nn_graph
250 .iter()
251 .map(|g| (g.format.clone(), g.dir.clone()))
252 .collect::<Vec<_>>();
253 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
254 host.nn.replace(WasiNnCtx::new(backends, registry));
255 }
256 }
257
258 if self.run.common.wasi.config == Some(true) {
259 #[cfg(feature = "wasi-config")]
260 {
261 let vars = WasiConfigVariables::from_iter(
262 self.run
263 .common
264 .wasi
265 .config_var
266 .iter()
267 .map(|v| (v.key.clone(), v.value.clone())),
268 );
269 host.wasi_config.replace(vars);
270 }
271 }
272
273 if self.run.common.wasi.keyvalue == Some(true) {
274 #[cfg(feature = "wasi-keyvalue")]
275 {
276 let ctx = WasiKeyValueCtxBuilder::new()
277 .in_memory_data(
278 self.run
279 .common
280 .wasi
281 .keyvalue_in_memory_data
282 .iter()
283 .map(|v| (v.key.clone(), v.value.clone())),
284 )
285 .build();
286 host.wasi_keyvalue.replace(ctx);
287 }
288 }
289
290 let mut store = Store::new(engine, host);
291
292 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
293 store.set_hostcall_fuel(fuel);
294 }
295
296 store.data_mut().limits = self.run.store_limits();
297 store.limiter(|t| &mut t.limits);
298
299 if let Some(fuel) = self.run.common.wasm.fuel {
302 store.set_fuel(fuel)?;
303 }
304
305 Ok(store)
306 }
307
308 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
309 self.run.validate_p3_option()?;
310 let cli = self.run.validate_cli_enabled()?;
311
312 if cli == Some(true) {
321 self.run.add_wasmtime_wasi_to_linker(linker)?;
322 wasmtime_wasi_http::p2::add_only_http_to_linker_async(linker)?;
323 #[cfg(feature = "component-model-async")]
324 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
325 wasmtime_wasi_http::p3::add_to_linker(linker)?;
326 }
327 } else {
328 wasmtime_wasi_http::p2::add_to_linker_async(linker)?;
329 #[cfg(feature = "component-model-async")]
330 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
331 wasmtime_wasi_http::p3::add_to_linker(linker)?;
332 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
333 wasmtime_wasi::p3::random::add_to_linker(linker)?;
334 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
335 }
336 }
337
338 if self.run.common.wasi.nn == Some(true) {
339 #[cfg(not(feature = "wasi-nn"))]
340 {
341 bail!("support for wasi-nn was disabled at compile time");
342 }
343 #[cfg(feature = "wasi-nn")]
344 {
345 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
346 let ctx = h.nn.as_mut().unwrap();
347 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
348 })?;
349 }
350 }
351
352 if self.run.common.wasi.config == Some(true) {
353 #[cfg(not(feature = "wasi-config"))]
354 {
355 bail!("support for wasi-config was disabled at compile time");
356 }
357 #[cfg(feature = "wasi-config")]
358 {
359 wasmtime_wasi_config::add_to_linker(linker, |h| {
360 WasiConfig::from(h.wasi_config.as_ref().unwrap())
361 })?;
362 }
363 }
364
365 if self.run.common.wasi.keyvalue == Some(true) {
366 #[cfg(not(feature = "wasi-keyvalue"))]
367 {
368 bail!("support for wasi-keyvalue was disabled at compile time");
369 }
370 #[cfg(feature = "wasi-keyvalue")]
371 {
372 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
373 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
374 })?;
375 }
376 }
377
378 if self.run.common.wasi.threads == Some(true) {
379 bail!("support for wasi-threads is not available with components");
380 }
381
382 if self.run.common.wasi.http == Some(false) {
383 bail!("support for wasi-http must be enabled for `serve` subcommand");
384 }
385
386 Ok(())
387 }
388
389 async fn serve(mut self) -> Result<()> {
390 use hyper::server::conn::http1;
391
392 let mut config = self
393 .run
394 .common
395 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
396 config.wasm_component_model(true);
397
398 if self.run.common.wasm.timeout.is_some() {
399 config.epoch_interruption(true);
400 }
401
402 match self.run.profile {
403 Some(Profile::Native(s)) => {
404 config.profiler(s);
405 }
406 Some(Profile::Guest { .. }) => {
407 config.epoch_interruption(true);
408 }
409 None => {}
410 }
411
412 let engine = Engine::new(&config)?;
413 let mut linker = Linker::new(&engine);
414
415 self.add_to_linker(&mut linker)?;
416
417 let component = match self.run.load_module(&engine, &self.component)? {
418 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
419 RunTarget::Component(c) => c,
420 };
421
422 let instance = linker.instantiate_pre(&component)?;
423 #[cfg(feature = "component-model-async")]
424 let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
425 Ok(pre) => ProxyPre::P3(pre),
426 Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
427 };
428 #[cfg(not(feature = "component-model-async"))]
429 let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
430
431 let shutdown = Arc::new(GracefulShutdown::default());
435 tokio::task::spawn({
436 let shutdown = shutdown.clone();
437 async move {
438 tokio::signal::ctrl_c().await.unwrap();
439 shutdown.requested.notify_one();
440 }
441 });
442 if let Some(addr) = self.shutdown_addr {
443 let listener = tokio::net::TcpListener::bind(addr).await?;
444 eprintln!(
445 "Listening for shutdown on tcp://{}/",
446 listener.local_addr()?
447 );
448 let shutdown = shutdown.clone();
449 tokio::task::spawn(async move {
450 let _ = listener.accept().await;
451 shutdown.requested.notify_one();
452 });
453 }
454
455 let socket = match &self.addr {
456 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
457 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
458 };
459 socket.set_reuseaddr(!cfg!(windows))?;
468 socket.bind(self.addr)?;
469 let listener = socket.listen(100)?;
470
471 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
472
473 log::info!("Listening on {}", self.addr);
474
475 let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
476 Some(interval)
477 } else if let Some(t) = self.run.common.wasm.timeout {
478 Some(EPOCH_INTERRUPT_PERIOD.min(t))
479 } else {
480 None
481 };
482 let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
483
484 let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
485 if let ProxyPre::P3(_) = &instance {
486 DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
487 } else {
488 DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
489 }
490 });
491
492 let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
493 self.max_instance_concurrent_reuse_count
494 .unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
495 } else {
496 1
497 };
498
499 let handler = ProxyHandler::new(
500 HostHandlerState {
501 cmd: self,
502 engine,
503 component,
504 max_instance_reuse_count,
505 max_instance_concurrent_reuse_count,
506 },
507 instance,
508 );
509
510 loop {
511 let (stream, _) = tokio::select! {
515 _ = shutdown.requested.notified() => break,
516 v = listener.accept() => v?,
517 };
518
519 stream.set_nodelay(true)?;
525
526 let stream = TokioIo::new(stream);
527 let h = handler.clone();
528 let shutdown_guard = shutdown.clone().increment();
529 tokio::task::spawn(async move {
530 if let Err(e) = http1::Builder::new()
531 .keep_alive(true)
532 .serve_connection(
533 stream,
534 hyper::service::service_fn(move |req| {
535 let h = h.clone();
536 async move {
537 use http_body_util::{BodyExt, Full};
538 match handle_request(h, req).await {
539 Ok(r) => Ok::<_, Infallible>(r),
540 Err(e) => {
541 eprintln!("error: {e:?}");
542 let error_html = "\
543<!doctype html>
544<html>
545<head>
546 <title>500 Internal Server Error</title>
547</head>
548<body>
549 <center>
550 <h1>500 Internal Server Error</h1>
551 <hr>
552 wasmtime
553 </center>
554</body>
555</html>";
556 Ok(Response::builder()
557 .status(StatusCode::INTERNAL_SERVER_ERROR)
558 .header("Content-Type", "text/html; charset=UTF-8")
559 .body(
560 Full::new(bytes::Bytes::from(error_html))
561 .map_err(|_| unreachable!())
562 .boxed_unsync(),
563 )
564 .unwrap())
565 }
566 }
567 }
568 }),
569 )
570 .await
571 {
572 eprintln!("error: {e:?}");
573 }
574 drop(shutdown_guard);
575 });
576 }
577
578 if shutdown.close() {
584 return Ok(());
585 }
586 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
587 tokio::select! {
588 _ = tokio::signal::ctrl_c() => {}
589 _ = shutdown.complete.notified() => {}
590 }
591
592 Ok(())
593 }
594}
595
596struct HostHandlerState {
597 cmd: ServeCommand,
598 engine: Engine,
599 component: Component,
600 max_instance_reuse_count: usize,
601 max_instance_concurrent_reuse_count: usize,
602}
603
604impl HandlerState for HostHandlerState {
605 type StoreData = Host;
606
607 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
608 let mut store = self.cmd.new_store(&self.engine, req_id)?;
609 let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
610
611 Ok(StoreBundle {
612 store,
613 write_profile,
614 })
615 }
616
617 fn request_timeout(&self) -> Duration {
618 self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
619 }
620
621 fn idle_instance_timeout(&self) -> Duration {
622 self.cmd.idle_instance_timeout
623 }
624
625 fn max_instance_reuse_count(&self) -> usize {
626 self.max_instance_reuse_count
627 }
628
629 fn max_instance_concurrent_reuse_count(&self) -> usize {
630 self.max_instance_concurrent_reuse_count
631 }
632
633 fn handle_worker_error(&self, error: wasmtime::Error) {
634 eprintln!("worker error: {error}");
635 }
636}
637
638#[derive(Default)]
640struct GracefulShutdown {
641 requested: Notify,
643 complete: Notify,
646 state: Mutex<GracefulShutdownState>,
648}
649
650#[derive(Default)]
651struct GracefulShutdownState {
652 active_tasks: u32,
653 notify_when_done: bool,
654}
655
656impl GracefulShutdown {
657 fn increment(self: Arc<Self>) -> impl Drop {
659 struct Guard(Arc<GracefulShutdown>);
660
661 let mut state = self.state.lock().unwrap();
662 assert!(!state.notify_when_done);
663 state.active_tasks += 1;
664 drop(state);
665
666 return Guard(self);
667
668 impl Drop for Guard {
669 fn drop(&mut self) {
670 let mut state = self.0.state.lock().unwrap();
671 state.active_tasks -= 1;
672 if state.notify_when_done && state.active_tasks == 0 {
673 self.0.complete.notify_one();
674 }
675 }
676 }
677 }
678
679 fn close(&self) -> bool {
682 let mut state = self.state.lock().unwrap();
683 state.notify_when_done = true;
684 state.active_tasks == 0
685 }
686}
687
688const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
692
693struct EpochThread {
694 shutdown: Arc<AtomicBool>,
695 handle: Option<std::thread::JoinHandle<()>>,
696}
697
698impl EpochThread {
699 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
700 let shutdown = Arc::new(AtomicBool::new(false));
701 let handle = {
702 let shutdown = Arc::clone(&shutdown);
703 let handle = std::thread::spawn(move || {
704 while !shutdown.load(Ordering::Relaxed) {
705 std::thread::sleep(interval);
706 engine.increment_epoch();
707 }
708 });
709 Some(handle)
710 };
711
712 EpochThread { shutdown, handle }
713 }
714}
715
716impl Drop for EpochThread {
717 fn drop(&mut self) {
718 if let Some(handle) = self.handle.take() {
719 self.shutdown.store(true, Ordering::Relaxed);
720 handle.join().unwrap();
721 }
722 }
723}
724
725type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
726
727fn setup_epoch_handler(
728 cmd: &ServeCommand,
729 store: &mut Store<Host>,
730 component: Component,
731) -> Result<WriteProfile> {
732 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
734 #[cfg(feature = "profiling")]
735 return setup_guest_profiler(store, path.clone(), *interval, component.clone());
736 #[cfg(not(feature = "profiling"))]
737 {
738 let _ = (path, interval);
739 bail!("support for profiling disabled at compile time!");
740 }
741 }
742
743 if cmd.run.common.wasm.timeout.is_some() {
745 store.epoch_deadline_async_yield_and_update(1);
746 }
747
748 Ok(Box::new(|_store| {}))
749}
750
751#[cfg(feature = "profiling")]
752fn setup_guest_profiler(
753 store: &mut Store<Host>,
754 path: String,
755 interval: Duration,
756 component: Component,
757) -> Result<WriteProfile> {
758 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
759
760 let module_name = "<main>";
761
762 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
763 store.engine(),
764 module_name,
765 interval,
766 component,
767 std::iter::empty(),
768 )?));
769
770 fn sample(
771 mut store: StoreContextMut<Host>,
772 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
773 ) {
774 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
775 f(
776 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
777 store.as_context(),
778 );
779 store.data_mut().guest_profiler = Some(profiler);
780 }
781
782 store.call_hook(|store, kind| {
784 sample(store, |profiler, store| profiler.call_hook(store, kind));
785 Ok(())
786 });
787
788 store.epoch_deadline_callback(move |store| {
789 sample(store, |profiler, store| {
790 profiler.sample(store, std::time::Duration::ZERO)
791 });
792
793 Ok(UpdateDeadline::Continue(1))
794 });
795
796 store.set_epoch_deadline(1);
797
798 let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
799 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
800 .expect("profiling doesn't support threads yet");
801 if let Err(e) = std::fs::File::create(&path)
802 .map_err(wasmtime::Error::new)
803 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
804 {
805 eprintln!("failed writing profile at {path}: {e:#}");
806 } else {
807 eprintln!();
808 eprintln!("Profile written to: {path}");
809 eprintln!("View this profile at https://profiler.firefox.com/.");
810 }
811 });
812
813 Ok(write_profile)
814}
815
816type Request = hyper::Request<hyper::body::Incoming>;
817
818async fn handle_request(
819 handler: ProxyHandler<HostHandlerState>,
820 req: Request,
821) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
822 use tokio::sync::oneshot;
823
824 let req_id = handler.next_req_id();
825
826 log::info!(
827 "Request {req_id} handling {} to {}",
828 req.method(),
829 req.uri()
830 );
831
832 type P2Response = Result<
839 hyper::Response<wasmtime_wasi_http::p2::body::HyperOutgoingBody>,
840 p2::http::types::ErrorCode,
841 >;
842 type P3Response = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
843
844 enum Sender {
845 P2(oneshot::Sender<P2Response>),
846 P3(oneshot::Sender<P3Response>),
847 }
848
849 enum Receiver {
850 P2(oneshot::Receiver<P2Response>),
851 P3(oneshot::Receiver<P3Response>),
852 }
853
854 let (tx, rx) = match handler.instance_pre() {
855 ProxyPre::P2(_) => {
856 let (tx, rx) = oneshot::channel();
857 (Sender::P2(tx), Receiver::P2(rx))
858 }
859 ProxyPre::P3(_) => {
860 let (tx, rx) = oneshot::channel();
861 (Sender::P3(tx), Receiver::P3(rx))
862 }
863 };
864
865 handler.spawn(
866 if handler.state().max_instance_reuse_count() == 1 {
867 Some(req_id)
868 } else {
869 None
870 },
871 Box::new(move |store, proxy| {
872 Box::pin(
873 async move {
874 match proxy {
875 Proxy::P2(proxy) => {
876 let Sender::P2(tx) = tx else { unreachable!() };
877 let (req, out) = store.with(move |mut store| {
878 let req = store
879 .data_mut()
880 .http()
881 .new_incoming_request(p2::http::types::Scheme::Http, req)?;
882 let out = store.data_mut().http().new_response_outparam(tx)?;
883 wasmtime::error::Ok((req, out))
884 })?;
885
886 proxy
887 .wasi_http_incoming_handler()
888 .call_handle(store, req, out)
889 .await
890 }
891 Proxy::P3(proxy) => {
892 use wasmtime_wasi_http::p3::bindings::http::types::{
893 ErrorCode, Request,
894 };
895
896 let Sender::P3(tx) = tx else { unreachable!() };
897 let (req, body) = req.into_parts();
898 let body = body.map_err(ErrorCode::from_hyper_request_error);
899 let req = http::Request::from_parts(req, body);
900 let (request, request_io_result) = Request::from_http(req);
901 let res = proxy.handle(store, request).await??;
902 let res = store
903 .with(|mut store| res.into_http(&mut store, request_io_result))?;
904
905 let (resp_body_tx, resp_body_rx) = oneshot::channel();
913 let res = res.map(|body| {
914 let body = body.map_err(|e| e.into());
915 P3BodyWrapper {
916 _tx: resp_body_tx,
917 body,
918 }
919 .boxed_unsync()
920 });
921
922 if tx.send(res).is_ok() {
927 _ = resp_body_rx.await;
928 }
929
930 Ok(())
931 }
932 }
933 }
934 .map(move |result| {
935 if let Err(error) = result {
936 eprintln!("[{req_id}] :: {error:?}");
937 }
938 }),
939 )
940 }),
941 );
942
943 return Ok(match rx {
944 Receiver::P2(rx) => rx
945 .await
946 .context("guest never invoked `response-outparam::set` method")?
947 .map_err(|e| wasmtime::Error::from(e))?
948 .map(|body| body.map_err(|e| e.into()).boxed_unsync()),
949 Receiver::P3(rx) => rx.await?,
950 });
951
952 struct P3BodyWrapper<B> {
955 body: B,
956 _tx: oneshot::Sender<()>,
957 }
958
959 impl<B: Body + Unpin> Body for P3BodyWrapper<B> {
960 type Data = B::Data;
961 type Error = B::Error;
962
963 fn poll_frame(
964 mut self: Pin<&mut Self>,
965 cx: &mut Context<'_>,
966 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
967 Pin::new(&mut self.body).poll_frame(cx)
968 }
969
970 fn is_end_stream(&self) -> bool {
971 self.body.is_end_stream()
972 }
973
974 fn size_hint(&self) -> SizeHint {
975 self.body.size_hint()
976 }
977 }
978}
979
980#[derive(Clone)]
981enum Output {
982 Stdout,
983 Stderr,
984}
985
986impl Output {
987 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
988 use std::io::Write;
989
990 match self {
991 Output::Stdout => std::io::stdout().write_all(buf),
992 Output::Stderr => std::io::stderr().write_all(buf),
993 }
994 }
995}
996
997#[derive(Clone)]
998struct LogStream {
999 output: Output,
1000 state: Arc<LogStreamState>,
1001}
1002
1003struct LogStreamState {
1004 prefix: String,
1005 needs_prefix_on_next_write: AtomicBool,
1006}
1007
1008impl LogStream {
1009 fn new(prefix: String, output: Output) -> LogStream {
1010 LogStream {
1011 output,
1012 state: Arc::new(LogStreamState {
1013 prefix,
1014 needs_prefix_on_next_write: AtomicBool::new(true),
1015 }),
1016 }
1017 }
1018
1019 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
1020 while !bytes.is_empty() {
1021 if self
1022 .state
1023 .needs_prefix_on_next_write
1024 .load(Ordering::Relaxed)
1025 {
1026 self.output.write_all(self.state.prefix.as_bytes())?;
1027 self.state
1028 .needs_prefix_on_next_write
1029 .store(false, Ordering::Relaxed);
1030 }
1031 match bytes.iter().position(|b| *b == b'\n') {
1032 Some(i) => {
1033 let (a, b) = bytes.split_at(i + 1);
1034 bytes = b;
1035 self.output.write_all(a)?;
1036 self.state
1037 .needs_prefix_on_next_write
1038 .store(true, Ordering::Relaxed);
1039 }
1040 None => {
1041 self.output.write_all(bytes)?;
1042 break;
1043 }
1044 }
1045 }
1046
1047 Ok(())
1048 }
1049}
1050
1051impl wasmtime_wasi::cli::StdoutStream for LogStream {
1052 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1053 Box::new(self.clone())
1054 }
1055 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1056 Box::new(self.clone())
1057 }
1058}
1059
1060impl wasmtime_wasi::cli::IsTerminal for LogStream {
1061 fn is_terminal(&self) -> bool {
1062 match &self.output {
1063 Output::Stdout => std::io::stdout().is_terminal(),
1064 Output::Stderr => std::io::stderr().is_terminal(),
1065 }
1066 }
1067}
1068
1069impl wasmtime_wasi::p2::OutputStream for LogStream {
1070 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1071 self.write_all(&bytes)
1072 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1073 Ok(())
1074 }
1075
1076 fn flush(&mut self) -> StreamResult<()> {
1077 Ok(())
1078 }
1079
1080 fn check_write(&mut self) -> StreamResult<usize> {
1081 Ok(1024 * 1024)
1082 }
1083}
1084
1085#[async_trait::async_trait]
1086impl wasmtime_wasi::p2::Pollable for LogStream {
1087 async fn ready(&mut self) {}
1088}
1089
1090impl AsyncWrite for LogStream {
1091 fn poll_write(
1092 mut self: Pin<&mut Self>,
1093 _cx: &mut Context<'_>,
1094 buf: &[u8],
1095 ) -> Poll<io::Result<usize>> {
1096 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1097 }
1098 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1099 Poll::Ready(Ok(()))
1100 }
1101 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1102 Poll::Ready(Ok(()))
1103 }
1104}
1105
1106fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1128 use wasmtime::{Config, Memory, MemoryType};
1129 const BITS_TO_TEST: u32 = 42;
1130 let mut config = Config::new();
1131 config.wasm_memory64(true);
1132 config.memory_reservation(1 << BITS_TO_TEST);
1133 let engine = Engine::new(&config)?;
1134 let mut store = Store::new(&engine, ());
1135 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1138 if Memory::new(&mut store, ty).is_ok() {
1139 Ok(Some(true))
1140 } else {
1141 Ok(None)
1142 }
1143}