1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use bytes::Bytes;
4use clap::Parser;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::BoxBody;
8use std::convert::Infallible;
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13use std::{
14 path::PathBuf,
15 sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, AtomicU64, Ordering},
18 },
19 time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker, ResourceTable};
24use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
25use wasmtime_wasi::p2::{StreamError, StreamResult};
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::bindings as p2;
28use wasmtime_wasi_http::io::TokioIo;
29use wasmtime_wasi_http::{
30 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
31 WasiHttpView,
32};
33
34#[cfg(feature = "wasi-config")]
35use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
36#[cfg(feature = "wasi-keyvalue")]
37use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
38#[cfg(feature = "wasi-nn")]
39use wasmtime_wasi_nn::wit::WasiNnCtx;
40
41struct Host {
42 table: wasmtime::component::ResourceTable,
43 ctx: WasiCtx,
44 http: WasiHttpCtx,
45 http_outgoing_body_buffer_chunks: Option<usize>,
46 http_outgoing_body_chunk_size: Option<usize>,
47
48 #[cfg(feature = "component-model-async")]
49 p3_http: DefaultP3Ctx,
50
51 limits: StoreLimits,
52
53 #[cfg(feature = "wasi-nn")]
54 nn: Option<WasiNnCtx>,
55
56 #[cfg(feature = "wasi-config")]
57 wasi_config: Option<WasiConfigVariables>,
58
59 #[cfg(feature = "wasi-keyvalue")]
60 wasi_keyvalue: Option<WasiKeyValueCtx>,
61
62 #[cfg(feature = "profiling")]
63 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
64}
65
66impl WasiView for Host {
67 fn ctx(&mut self) -> WasiCtxView<'_> {
68 WasiCtxView {
69 ctx: &mut self.ctx,
70 table: &mut self.table,
71 }
72 }
73}
74
75impl WasiHttpView for Host {
76 fn ctx(&mut self) -> &mut WasiHttpCtx {
77 &mut self.http
78 }
79 fn table(&mut self) -> &mut ResourceTable {
80 &mut self.table
81 }
82
83 fn outgoing_body_buffer_chunks(&mut self) -> usize {
84 self.http_outgoing_body_buffer_chunks
85 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
86 }
87
88 fn outgoing_body_chunk_size(&mut self) -> usize {
89 self.http_outgoing_body_chunk_size
90 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
91 }
92}
93
94#[cfg(feature = "component-model-async")]
95struct DefaultP3Ctx;
96#[cfg(feature = "component-model-async")]
97impl wasmtime_wasi_http::p3::WasiHttpCtx for DefaultP3Ctx {}
98
99#[cfg(feature = "component-model-async")]
100impl wasmtime_wasi_http::p3::WasiHttpView for Host {
101 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
102 wasmtime_wasi_http::p3::WasiHttpCtxView {
103 table: &mut self.table,
104 ctx: &mut self.p3_http,
105 }
106 }
107}
108
109const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
110 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
111 8080,
112);
113
114#[derive(Parser)]
116pub struct ServeCommand {
117 #[command(flatten)]
118 run: RunCommon,
119
120 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
122 addr: SocketAddr,
123
124 #[arg(long, value_name = "SOCKADDR")]
129 shutdown_addr: Option<SocketAddr>,
130
131 #[arg(long)]
134 no_logging_prefix: bool,
135
136 #[arg(value_name = "WASM", required = true)]
138 component: PathBuf,
139}
140
141impl ServeCommand {
142 pub fn execute(mut self) -> Result<()> {
144 self.run.common.init_logging()?;
145
146 if self.run.common.wasi.nn == Some(true) {
150 #[cfg(not(feature = "wasi-nn"))]
151 {
152 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
153 }
154 }
155
156 if self.run.common.wasi.threads == Some(true) {
157 bail!("wasi-threads does not support components yet")
158 }
159
160 if self.run.common.wasi.http.replace(true) == Some(false) {
163 bail!("wasi-http is required for the serve command, and must not be disabled");
164 }
165 if self.run.common.wasm.component_model.replace(true) == Some(false) {
166 bail!("components are required for the serve command, and must not be disabled");
167 }
168
169 let runtime = tokio::runtime::Builder::new_multi_thread()
170 .enable_time()
171 .enable_io()
172 .build()?;
173
174 runtime.block_on(self.serve())?;
175
176 Ok(())
177 }
178
179 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
180 let mut builder = WasiCtxBuilder::new();
181 self.run.configure_wasip2(&mut builder)?;
182
183 builder.env("REQUEST_ID", req_id.to_string());
184
185 let stdout_prefix: String;
186 let stderr_prefix: String;
187 if self.no_logging_prefix {
188 stdout_prefix = "".to_string();
189 stderr_prefix = "".to_string();
190 } else {
191 stdout_prefix = format!("stdout [{req_id}] :: ");
192 stderr_prefix = format!("stderr [{req_id}] :: ");
193 }
194 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
195 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
196
197 let mut host = Host {
198 table: wasmtime::component::ResourceTable::new(),
199 ctx: builder.build(),
200 http: WasiHttpCtx::new(),
201 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
202 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
203
204 limits: StoreLimits::default(),
205
206 #[cfg(feature = "wasi-nn")]
207 nn: None,
208 #[cfg(feature = "wasi-config")]
209 wasi_config: None,
210 #[cfg(feature = "wasi-keyvalue")]
211 wasi_keyvalue: None,
212 #[cfg(feature = "profiling")]
213 guest_profiler: None,
214 #[cfg(feature = "component-model-async")]
215 p3_http: DefaultP3Ctx,
216 };
217
218 if self.run.common.wasi.nn == Some(true) {
219 #[cfg(feature = "wasi-nn")]
220 {
221 let graphs = self
222 .run
223 .common
224 .wasi
225 .nn_graph
226 .iter()
227 .map(|g| (g.format.clone(), g.dir.clone()))
228 .collect::<Vec<_>>();
229 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
230 host.nn.replace(WasiNnCtx::new(backends, registry));
231 }
232 }
233
234 if self.run.common.wasi.config == Some(true) {
235 #[cfg(feature = "wasi-config")]
236 {
237 let vars = WasiConfigVariables::from_iter(
238 self.run
239 .common
240 .wasi
241 .config_var
242 .iter()
243 .map(|v| (v.key.clone(), v.value.clone())),
244 );
245 host.wasi_config.replace(vars);
246 }
247 }
248
249 if self.run.common.wasi.keyvalue == Some(true) {
250 #[cfg(feature = "wasi-keyvalue")]
251 {
252 let ctx = WasiKeyValueCtxBuilder::new()
253 .in_memory_data(
254 self.run
255 .common
256 .wasi
257 .keyvalue_in_memory_data
258 .iter()
259 .map(|v| (v.key.clone(), v.value.clone())),
260 )
261 .build();
262 host.wasi_keyvalue.replace(ctx);
263 }
264 }
265
266 let mut store = Store::new(engine, host);
267
268 store.data_mut().limits = self.run.store_limits();
269 store.limiter(|t| &mut t.limits);
270
271 if let Some(fuel) = self.run.common.wasm.fuel {
274 store.set_fuel(fuel)?;
275 }
276
277 Ok(store)
278 }
279
280 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
281 self.run.validate_p3_option()?;
282 let cli = self.run.validate_cli_enabled()?;
283
284 if cli == Some(true) {
293 self.run.add_wasmtime_wasi_to_linker(linker)?;
294 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
295 #[cfg(feature = "component-model-async")]
296 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
297 wasmtime_wasi_http::p3::add_to_linker(linker)?;
298 }
299 } else {
300 wasmtime_wasi_http::add_to_linker_async(linker)?;
301 #[cfg(feature = "component-model-async")]
302 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
303 wasmtime_wasi_http::p3::add_to_linker(linker)?;
304 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
305 wasmtime_wasi::p3::random::add_to_linker(linker)?;
306 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
307 }
308 }
309
310 if self.run.common.wasi.nn == Some(true) {
311 #[cfg(not(feature = "wasi-nn"))]
312 {
313 bail!("support for wasi-nn was disabled at compile time");
314 }
315 #[cfg(feature = "wasi-nn")]
316 {
317 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
318 let ctx = h.nn.as_mut().unwrap();
319 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
320 })?;
321 }
322 }
323
324 if self.run.common.wasi.config == Some(true) {
325 #[cfg(not(feature = "wasi-config"))]
326 {
327 bail!("support for wasi-config was disabled at compile time");
328 }
329 #[cfg(feature = "wasi-config")]
330 {
331 wasmtime_wasi_config::add_to_linker(linker, |h| {
332 WasiConfig::from(h.wasi_config.as_ref().unwrap())
333 })?;
334 }
335 }
336
337 if self.run.common.wasi.keyvalue == Some(true) {
338 #[cfg(not(feature = "wasi-keyvalue"))]
339 {
340 bail!("support for wasi-keyvalue was disabled at compile time");
341 }
342 #[cfg(feature = "wasi-keyvalue")]
343 {
344 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
345 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
346 })?;
347 }
348 }
349
350 if self.run.common.wasi.threads == Some(true) {
351 bail!("support for wasi-threads is not available with components");
352 }
353
354 if self.run.common.wasi.http == Some(false) {
355 bail!("support for wasi-http must be enabled for `serve` subcommand");
356 }
357
358 Ok(())
359 }
360
361 async fn serve(mut self) -> Result<()> {
362 use hyper::server::conn::http1;
363
364 let mut config = self
365 .run
366 .common
367 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
368 config.wasm_component_model(true);
369 config.async_support(true);
370
371 if self.run.common.wasm.timeout.is_some() {
372 config.epoch_interruption(true);
373 }
374
375 match self.run.profile {
376 Some(Profile::Native(s)) => {
377 config.profiler(s);
378 }
379 Some(Profile::Guest { .. }) => {
380 config.epoch_interruption(true);
381 }
382 None => {}
383 }
384
385 let engine = Engine::new(&config)?;
386 let mut linker = Linker::new(&engine);
387
388 self.add_to_linker(&mut linker)?;
389
390 let component = match self.run.load_module(&engine, &self.component)? {
391 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
392 RunTarget::Component(c) => c,
393 };
394
395 let instance = linker.instantiate_pre(&component)?;
396 #[cfg(feature = "component-model-async")]
397 let instance = match wasmtime_wasi_http::p3::bindings::ProxyIndices::new(&instance) {
398 Ok(indices) => ProxyPre::P3(indices, instance),
399 Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
400 };
401 #[cfg(not(feature = "component-model-async"))]
402 let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
403
404 let shutdown = Arc::new(GracefulShutdown::default());
408 tokio::task::spawn({
409 let shutdown = shutdown.clone();
410 async move {
411 tokio::signal::ctrl_c().await.unwrap();
412 shutdown.requested.notify_one();
413 }
414 });
415 if let Some(addr) = self.shutdown_addr {
416 let listener = tokio::net::TcpListener::bind(addr).await?;
417 eprintln!(
418 "Listening for shutdown on tcp://{}/",
419 listener.local_addr()?
420 );
421 let shutdown = shutdown.clone();
422 tokio::task::spawn(async move {
423 let _ = listener.accept().await;
424 shutdown.requested.notify_one();
425 });
426 }
427
428 let socket = match &self.addr {
429 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
430 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
431 };
432 socket.set_reuseaddr(!cfg!(windows))?;
441 socket.bind(self.addr)?;
442 let listener = socket.listen(100)?;
443
444 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
445
446 log::info!("Listening on {}", self.addr);
447
448 let handler = ProxyHandler::new(self, engine, instance);
449
450 loop {
451 let (stream, _) = tokio::select! {
455 _ = shutdown.requested.notified() => break,
456 v = listener.accept() => v?,
457 };
458 let comp = component.clone();
459 let stream = TokioIo::new(stream);
460 let h = handler.clone();
461 let shutdown_guard = shutdown.clone().increment();
462 tokio::task::spawn(async move {
463 if let Err(e) = http1::Builder::new()
464 .keep_alive(true)
465 .serve_connection(
466 stream,
467 hyper::service::service_fn(move |req| {
468 let comp = comp.clone();
469 let h = h.clone();
470 async move {
471 use http_body_util::{BodyExt, Full};
472 match handle_request(h, req, comp).await {
473 Ok(r) => Ok::<_, Infallible>(r),
474 Err(e) => {
475 eprintln!("error: {e:?}");
476 let error_html = "\
477<!doctype html>
478<html>
479<head>
480 <title>500 Internal Server Error</title>
481</head>
482<body>
483 <center>
484 <h1>500 Internal Server Error</h1>
485 <hr>
486 wasmtime
487 </center>
488</body>
489</html>";
490 Ok(Response::builder()
491 .status(StatusCode::INTERNAL_SERVER_ERROR)
492 .header("Content-Type", "text/html; charset=UTF-8")
493 .body(
494 Full::new(bytes::Bytes::from(error_html))
495 .map_err(|_| unreachable!())
496 .boxed(),
497 )
498 .unwrap())
499 }
500 }
501 }
502 }),
503 )
504 .await
505 {
506 eprintln!("error: {e:?}");
507 }
508 drop(shutdown_guard);
509 });
510 }
511
512 if shutdown.close() {
518 return Ok(());
519 }
520 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
521 tokio::select! {
522 _ = tokio::signal::ctrl_c() => {}
523 _ = shutdown.complete.notified() => {}
524 }
525
526 Ok(())
527 }
528}
529
530#[derive(Default)]
532struct GracefulShutdown {
533 requested: Notify,
535 complete: Notify,
538 state: Mutex<GracefulShutdownState>,
540}
541
542#[derive(Default)]
543struct GracefulShutdownState {
544 active_tasks: u32,
545 notify_when_done: bool,
546}
547
548impl GracefulShutdown {
549 fn increment(self: Arc<Self>) -> impl Drop {
551 struct Guard(Arc<GracefulShutdown>);
552
553 let mut state = self.state.lock().unwrap();
554 assert!(!state.notify_when_done);
555 state.active_tasks += 1;
556 drop(state);
557
558 return Guard(self);
559
560 impl Drop for Guard {
561 fn drop(&mut self) {
562 let mut state = self.0.state.lock().unwrap();
563 state.active_tasks -= 1;
564 if state.notify_when_done && state.active_tasks == 0 {
565 self.0.complete.notify_one();
566 }
567 }
568 }
569 }
570
571 fn close(&self) -> bool {
574 let mut state = self.state.lock().unwrap();
575 state.notify_when_done = true;
576 state.active_tasks == 0
577 }
578}
579
580const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
584
585struct EpochThread {
586 shutdown: Arc<AtomicBool>,
587 handle: Option<std::thread::JoinHandle<()>>,
588}
589
590impl EpochThread {
591 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
592 let shutdown = Arc::new(AtomicBool::new(false));
593 let handle = {
594 let shutdown = Arc::clone(&shutdown);
595 let handle = std::thread::spawn(move || {
596 while !shutdown.load(Ordering::Relaxed) {
597 std::thread::sleep(interval);
598 engine.increment_epoch();
599 }
600 });
601 Some(handle)
602 };
603
604 EpochThread { shutdown, handle }
605 }
606}
607
608impl Drop for EpochThread {
609 fn drop(&mut self) {
610 if let Some(handle) = self.handle.take() {
611 self.shutdown.store(true, Ordering::Relaxed);
612 handle.join().unwrap();
613 }
614 }
615}
616
617type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
618
619fn setup_epoch_handler(
620 cmd: &ServeCommand,
621 store: &mut Store<Host>,
622 component: Component,
623) -> Result<(WriteProfile, Option<EpochThread>)> {
624 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
626 #[cfg(feature = "profiling")]
627 return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
628 #[cfg(not(feature = "profiling"))]
629 {
630 let _ = (path, interval);
631 bail!("support for profiling disabled at compile time!");
632 }
633 }
634
635 let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
637 let start = Instant::now();
638 store.epoch_deadline_callback(move |_store| {
639 if start.elapsed() > timeout {
640 bail!("Timeout expired");
641 }
642 Ok(UpdateDeadline::Continue(1))
643 });
644 store.set_epoch_deadline(1);
645 let engine = store.engine().clone();
646 Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
647 } else {
648 None
649 };
650
651 Ok((Box::new(|_store| {}), epoch_thread))
652}
653
654#[cfg(feature = "profiling")]
655fn setup_guest_profiler(
656 cmd: &ServeCommand,
657 store: &mut Store<Host>,
658 path: String,
659 interval: Duration,
660 component: Component,
661) -> Result<(WriteProfile, Option<EpochThread>)> {
662 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
663
664 let module_name = "<main>";
665
666 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
667 module_name,
668 interval,
669 component,
670 std::iter::empty(),
671 )));
672
673 fn sample(
674 mut store: StoreContextMut<Host>,
675 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
676 ) {
677 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
678 f(
679 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
680 store.as_context(),
681 );
682 store.data_mut().guest_profiler = Some(profiler);
683 }
684
685 store.call_hook(|store, kind| {
687 sample(store, |profiler, store| profiler.call_hook(store, kind));
688 Ok(())
689 });
690
691 let start = Instant::now();
692 let timeout = cmd.run.common.wasm.timeout;
693 store.epoch_deadline_callback(move |store| {
694 sample(store, |profiler, store| {
695 profiler.sample(store, std::time::Duration::ZERO)
696 });
697
698 if let Some(timeout) = timeout {
702 if start.elapsed() > timeout {
703 bail!("Timeout expired");
704 }
705 }
706
707 Ok(UpdateDeadline::Continue(1))
708 });
709
710 store.set_epoch_deadline(1);
711 let engine = store.engine().clone();
712 let epoch_thread = Some(EpochThread::spawn(interval, engine));
713
714 let write_profile = Box::new(move |store: &mut Store<Host>| {
715 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
716 .expect("profiling doesn't support threads yet");
717 if let Err(e) = std::fs::File::create(&path)
718 .map_err(anyhow::Error::new)
719 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
720 {
721 eprintln!("failed writing profile at {path}: {e:#}");
722 } else {
723 eprintln!();
724 eprintln!("Profile written to: {path}");
725 eprintln!("View this profile at https://profiler.firefox.com/.");
726 }
727 });
728
729 Ok((write_profile, epoch_thread))
730}
731
732struct ProxyHandlerInner {
733 cmd: ServeCommand,
734 engine: Engine,
735 instance_pre: ProxyPre,
736 next_id: AtomicU64,
737}
738
739enum ProxyPre {
740 P2(p2::ProxyPre<Host>),
741 #[cfg(feature = "component-model-async")]
742 P3(
743 wasmtime_wasi_http::p3::bindings::ProxyIndices,
744 wasmtime::component::InstancePre<Host>,
745 ),
746}
747
748impl ProxyPre {
749 async fn instantiate(&self, store: &mut Store<Host>) -> Result<Proxy> {
750 Ok(match self {
751 ProxyPre::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
752 #[cfg(feature = "component-model-async")]
753 ProxyPre::P3(indices, pre) => {
754 let instance = pre.instantiate_async(&mut *store).await?;
755 let proxy = indices.load(&mut *store, &instance)?;
756 Proxy::P3(proxy, instance)
757 }
758 })
759 }
760}
761
762enum Proxy {
763 P2(p2::Proxy),
764 #[cfg(feature = "component-model-async")]
765 P3(
766 wasmtime_wasi_http::p3::bindings::Proxy,
767 wasmtime::component::Instance,
768 ),
769}
770
771impl ProxyHandlerInner {
772 fn next_req_id(&self) -> u64 {
773 self.next_id.fetch_add(1, Ordering::Relaxed)
774 }
775}
776
777#[derive(Clone)]
778struct ProxyHandler(Arc<ProxyHandlerInner>);
779
780impl ProxyHandler {
781 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre) -> Self {
782 Self(Arc::new(ProxyHandlerInner {
783 cmd,
784 engine,
785 instance_pre,
786 next_id: AtomicU64::from(0),
787 }))
788 }
789}
790
791type Request = hyper::Request<hyper::body::Incoming>;
792
793async fn handle_request(
794 ProxyHandler(inner): ProxyHandler,
795 req: Request,
796 component: Component,
797) -> Result<hyper::Response<BoxBody<Bytes, anyhow::Error>>> {
798 let (sender, receiver) = tokio::sync::oneshot::channel();
799
800 let req_id = inner.next_req_id();
801
802 log::info!(
803 "Request {req_id} handling {} to {}",
804 req.method(),
805 req.uri()
806 );
807
808 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
809
810 let (write_profile, epoch_thread) =
811 setup_epoch_handler(&inner.cmd, &mut store, component.clone())?;
812
813 match inner.instance_pre.instantiate(&mut store).await? {
814 Proxy::P2(proxy) => {
815 let req = store
816 .data_mut()
817 .new_incoming_request(p2::http::types::Scheme::Http, req)?;
818 let out = store.data_mut().new_response_outparam(sender)?;
819 let task = tokio::task::spawn(async move {
820 if let Err(e) = proxy
821 .wasi_http_incoming_handler()
822 .call_handle(&mut store, req, out)
823 .await
824 {
825 log::error!("[{req_id}] :: {e:?}");
826 return Err(e);
827 }
828
829 write_profile(&mut store);
830 drop(epoch_thread);
831
832 Ok(())
833 });
834
835 let result = match receiver.await {
836 Ok(Ok(resp)) => resp,
837 Ok(Err(e)) => bail!(e),
838 Err(_) => {
839 let e = match task.await {
847 Ok(Ok(())) => {
848 bail!("guest never invoked `response-outparam::set` method")
849 }
850 Ok(Err(e)) => e,
851 Err(e) => e.into(),
852 };
853 bail!(e.context("guest never invoked `response-outparam::set` method"))
854 }
855 };
856
857 Ok(result.map(|body| body.map_err(|e| e.into()).boxed()))
858 }
859 #[cfg(feature = "component-model-async")]
860 Proxy::P3(proxy, instance) => {
861 use wasmtime_wasi_http::p3::bindings::http::types::{ErrorCode, Request};
862
863 let (tx, rx) = tokio::sync::oneshot::channel();
864
865 tokio::task::spawn(async move {
866 let guest_result = instance
867 .run_concurrent(&mut store, async move |store| {
868 let (req, body) = req.into_parts();
869 let body = body.map_err(ErrorCode::from_hyper_request_error);
870 let req = http::Request::from_parts(req, body);
871 let (request, request_io_result) = Request::from_http(req);
872 let (res, task) = proxy.handle(store, request).await??;
873 let res =
874 store.with(|mut store| res.into_http(&mut store, request_io_result))?;
875
876 _ = tx.send(res);
877
878 task.block(store).await;
879 anyhow::Ok(())
880 })
881 .await?;
882 if let Err(e) = guest_result {
883 log::error!("[{req_id}] :: {e:?}");
884 return Err(e);
885 }
886
887 write_profile(&mut store);
888 drop(epoch_thread);
889
890 anyhow::Ok(())
891 });
892 Ok(rx.await?.map(|body| body.map_err(|err| err.into()).boxed()))
893 }
894 }
895}
896
897#[derive(Clone)]
898enum Output {
899 Stdout,
900 Stderr,
901}
902
903impl Output {
904 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
905 use std::io::Write;
906
907 match self {
908 Output::Stdout => std::io::stdout().write_all(buf),
909 Output::Stderr => std::io::stderr().write_all(buf),
910 }
911 }
912}
913
914#[derive(Clone)]
915struct LogStream {
916 output: Output,
917 state: Arc<LogStreamState>,
918}
919
920struct LogStreamState {
921 prefix: String,
922 needs_prefix_on_next_write: AtomicBool,
923}
924
925impl LogStream {
926 fn new(prefix: String, output: Output) -> LogStream {
927 LogStream {
928 output,
929 state: Arc::new(LogStreamState {
930 prefix,
931 needs_prefix_on_next_write: AtomicBool::new(true),
932 }),
933 }
934 }
935
936 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
937 while !bytes.is_empty() {
938 if self
939 .state
940 .needs_prefix_on_next_write
941 .load(Ordering::Relaxed)
942 {
943 self.output.write_all(self.state.prefix.as_bytes())?;
944 self.state
945 .needs_prefix_on_next_write
946 .store(false, Ordering::Relaxed);
947 }
948 match bytes.iter().position(|b| *b == b'\n') {
949 Some(i) => {
950 let (a, b) = bytes.split_at(i + 1);
951 bytes = b;
952 self.output.write_all(a)?;
953 self.state
954 .needs_prefix_on_next_write
955 .store(true, Ordering::Relaxed);
956 }
957 None => {
958 self.output.write_all(bytes)?;
959 break;
960 }
961 }
962 }
963
964 Ok(())
965 }
966}
967
968impl wasmtime_wasi::cli::StdoutStream for LogStream {
969 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
970 Box::new(self.clone())
971 }
972 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
973 Box::new(self.clone())
974 }
975}
976
977impl wasmtime_wasi::cli::IsTerminal for LogStream {
978 fn is_terminal(&self) -> bool {
979 match &self.output {
980 Output::Stdout => std::io::stdout().is_terminal(),
981 Output::Stderr => std::io::stderr().is_terminal(),
982 }
983 }
984}
985
986impl wasmtime_wasi::p2::OutputStream for LogStream {
987 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
988 self.write_all(&bytes)
989 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
990 Ok(())
991 }
992
993 fn flush(&mut self) -> StreamResult<()> {
994 Ok(())
995 }
996
997 fn check_write(&mut self) -> StreamResult<usize> {
998 Ok(1024 * 1024)
999 }
1000}
1001
1002#[async_trait::async_trait]
1003impl wasmtime_wasi::p2::Pollable for LogStream {
1004 async fn ready(&mut self) {}
1005}
1006
1007impl AsyncWrite for LogStream {
1008 fn poll_write(
1009 mut self: Pin<&mut Self>,
1010 _cx: &mut Context<'_>,
1011 buf: &[u8],
1012 ) -> Poll<io::Result<usize>> {
1013 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1014 }
1015 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1016 Poll::Ready(Ok(()))
1017 }
1018 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1019 Poll::Ready(Ok(()))
1020 }
1021}
1022
1023fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1045 use wasmtime::{Config, Memory, MemoryType};
1046 const BITS_TO_TEST: u32 = 42;
1047 let mut config = Config::new();
1048 config.wasm_memory64(true);
1049 config.memory_reservation(1 << BITS_TO_TEST);
1050 let engine = Engine::new(&config)?;
1051 let mut store = Store::new(&engine, ());
1052 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1055 if Memory::new(&mut store, ty).is_ok() {
1056 Ok(Some(true))
1057 } else {
1058 Ok(None)
1059 }
1060}