1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use bytes::Bytes;
4use clap::Parser;
5use http::{Response, StatusCode};
6use http_body_util::BodyExt as _;
7use http_body_util::combinators::BoxBody;
8use std::convert::Infallible;
9use std::net::SocketAddr;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Instant;
13use std::{
14 path::PathBuf,
15 sync::{
16 Arc, Mutex,
17 atomic::{AtomicBool, AtomicU64, Ordering},
18 },
19 time::Duration,
20};
21use tokio::io::{self, AsyncWrite};
22use tokio::sync::Notify;
23use wasmtime::component::{Component, Linker, ResourceTable};
24use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
25use wasmtime_wasi::p2::{StreamError, StreamResult};
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::bindings as p2;
28use wasmtime_wasi_http::io::TokioIo;
29use wasmtime_wasi_http::{
30 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
31 WasiHttpView,
32};
33
34#[cfg(feature = "wasi-config")]
35use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
36#[cfg(feature = "wasi-keyvalue")]
37use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
38#[cfg(feature = "wasi-nn")]
39use wasmtime_wasi_nn::wit::WasiNnCtx;
40
41struct Host {
42 table: wasmtime::component::ResourceTable,
43 ctx: WasiCtx,
44 http: WasiHttpCtx,
45 http_outgoing_body_buffer_chunks: Option<usize>,
46 http_outgoing_body_chunk_size: Option<usize>,
47
48 #[cfg(feature = "component-model-async")]
49 p3_http: crate::common::DefaultP3Ctx,
50
51 limits: StoreLimits,
52
53 #[cfg(feature = "wasi-nn")]
54 nn: Option<WasiNnCtx>,
55
56 #[cfg(feature = "wasi-config")]
57 wasi_config: Option<WasiConfigVariables>,
58
59 #[cfg(feature = "wasi-keyvalue")]
60 wasi_keyvalue: Option<WasiKeyValueCtx>,
61
62 #[cfg(feature = "profiling")]
63 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
64}
65
66impl WasiView for Host {
67 fn ctx(&mut self) -> WasiCtxView<'_> {
68 WasiCtxView {
69 ctx: &mut self.ctx,
70 table: &mut self.table,
71 }
72 }
73}
74
75impl WasiHttpView for Host {
76 fn ctx(&mut self) -> &mut WasiHttpCtx {
77 &mut self.http
78 }
79 fn table(&mut self) -> &mut ResourceTable {
80 &mut self.table
81 }
82
83 fn outgoing_body_buffer_chunks(&mut self) -> usize {
84 self.http_outgoing_body_buffer_chunks
85 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
86 }
87
88 fn outgoing_body_chunk_size(&mut self) -> usize {
89 self.http_outgoing_body_chunk_size
90 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
91 }
92}
93
94#[cfg(feature = "component-model-async")]
95impl wasmtime_wasi_http::p3::WasiHttpView for Host {
96 fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
97 wasmtime_wasi_http::p3::WasiHttpCtxView {
98 table: &mut self.table,
99 ctx: &mut self.p3_http,
100 }
101 }
102}
103
104const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
105 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
106 8080,
107);
108
109#[derive(Parser)]
111pub struct ServeCommand {
112 #[command(flatten)]
113 run: RunCommon,
114
115 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
117 addr: SocketAddr,
118
119 #[arg(long, value_name = "SOCKADDR")]
124 shutdown_addr: Option<SocketAddr>,
125
126 #[arg(long)]
129 no_logging_prefix: bool,
130
131 #[arg(value_name = "WASM", required = true)]
133 component: PathBuf,
134}
135
136impl ServeCommand {
137 pub fn execute(mut self) -> Result<()> {
139 self.run.common.init_logging()?;
140
141 if self.run.common.wasi.nn == Some(true) {
145 #[cfg(not(feature = "wasi-nn"))]
146 {
147 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
148 }
149 }
150
151 if self.run.common.wasi.threads == Some(true) {
152 bail!("wasi-threads does not support components yet")
153 }
154
155 if self.run.common.wasi.http.replace(true) == Some(false) {
158 bail!("wasi-http is required for the serve command, and must not be disabled");
159 }
160 if self.run.common.wasm.component_model.replace(true) == Some(false) {
161 bail!("components are required for the serve command, and must not be disabled");
162 }
163
164 let runtime = tokio::runtime::Builder::new_multi_thread()
165 .enable_time()
166 .enable_io()
167 .build()?;
168
169 runtime.block_on(self.serve())?;
170
171 Ok(())
172 }
173
174 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
175 let mut builder = WasiCtxBuilder::new();
176 self.run.configure_wasip2(&mut builder)?;
177
178 builder.env("REQUEST_ID", req_id.to_string());
179
180 let stdout_prefix: String;
181 let stderr_prefix: String;
182 if self.no_logging_prefix {
183 stdout_prefix = "".to_string();
184 stderr_prefix = "".to_string();
185 } else {
186 stdout_prefix = format!("stdout [{req_id}] :: ");
187 stderr_prefix = format!("stderr [{req_id}] :: ");
188 }
189 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
190 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
191
192 let mut host = Host {
193 table: wasmtime::component::ResourceTable::new(),
194 ctx: builder.build(),
195 http: WasiHttpCtx::new(),
196 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
197 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
198
199 limits: StoreLimits::default(),
200
201 #[cfg(feature = "wasi-nn")]
202 nn: None,
203 #[cfg(feature = "wasi-config")]
204 wasi_config: None,
205 #[cfg(feature = "wasi-keyvalue")]
206 wasi_keyvalue: None,
207 #[cfg(feature = "profiling")]
208 guest_profiler: None,
209 #[cfg(feature = "component-model-async")]
210 p3_http: crate::common::DefaultP3Ctx,
211 };
212
213 if self.run.common.wasi.nn == Some(true) {
214 #[cfg(feature = "wasi-nn")]
215 {
216 let graphs = self
217 .run
218 .common
219 .wasi
220 .nn_graph
221 .iter()
222 .map(|g| (g.format.clone(), g.dir.clone()))
223 .collect::<Vec<_>>();
224 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
225 host.nn.replace(WasiNnCtx::new(backends, registry));
226 }
227 }
228
229 if self.run.common.wasi.config == Some(true) {
230 #[cfg(feature = "wasi-config")]
231 {
232 let vars = WasiConfigVariables::from_iter(
233 self.run
234 .common
235 .wasi
236 .config_var
237 .iter()
238 .map(|v| (v.key.clone(), v.value.clone())),
239 );
240 host.wasi_config.replace(vars);
241 }
242 }
243
244 if self.run.common.wasi.keyvalue == Some(true) {
245 #[cfg(feature = "wasi-keyvalue")]
246 {
247 let ctx = WasiKeyValueCtxBuilder::new()
248 .in_memory_data(
249 self.run
250 .common
251 .wasi
252 .keyvalue_in_memory_data
253 .iter()
254 .map(|v| (v.key.clone(), v.value.clone())),
255 )
256 .build();
257 host.wasi_keyvalue.replace(ctx);
258 }
259 }
260
261 let mut store = Store::new(engine, host);
262
263 store.data_mut().limits = self.run.store_limits();
264 store.limiter(|t| &mut t.limits);
265
266 if let Some(fuel) = self.run.common.wasm.fuel {
269 store.set_fuel(fuel)?;
270 }
271
272 Ok(store)
273 }
274
275 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
276 self.run.validate_p3_option()?;
277 let cli = self.run.validate_cli_enabled()?;
278
279 if cli == Some(true) {
288 self.run.add_wasmtime_wasi_to_linker(linker)?;
289 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
290 #[cfg(feature = "component-model-async")]
291 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
292 wasmtime_wasi_http::p3::add_to_linker(linker)?;
293 }
294 } else {
295 wasmtime_wasi_http::add_to_linker_async(linker)?;
296 #[cfg(feature = "component-model-async")]
297 if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
298 wasmtime_wasi_http::p3::add_to_linker(linker)?;
299 wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
300 wasmtime_wasi::p3::random::add_to_linker(linker)?;
301 wasmtime_wasi::p3::cli::add_to_linker(linker)?;
302 }
303 }
304
305 if self.run.common.wasi.nn == Some(true) {
306 #[cfg(not(feature = "wasi-nn"))]
307 {
308 bail!("support for wasi-nn was disabled at compile time");
309 }
310 #[cfg(feature = "wasi-nn")]
311 {
312 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
313 let ctx = h.nn.as_mut().unwrap();
314 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
315 })?;
316 }
317 }
318
319 if self.run.common.wasi.config == Some(true) {
320 #[cfg(not(feature = "wasi-config"))]
321 {
322 bail!("support for wasi-config was disabled at compile time");
323 }
324 #[cfg(feature = "wasi-config")]
325 {
326 wasmtime_wasi_config::add_to_linker(linker, |h| {
327 WasiConfig::from(h.wasi_config.as_ref().unwrap())
328 })?;
329 }
330 }
331
332 if self.run.common.wasi.keyvalue == Some(true) {
333 #[cfg(not(feature = "wasi-keyvalue"))]
334 {
335 bail!("support for wasi-keyvalue was disabled at compile time");
336 }
337 #[cfg(feature = "wasi-keyvalue")]
338 {
339 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
340 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
341 })?;
342 }
343 }
344
345 if self.run.common.wasi.threads == Some(true) {
346 bail!("support for wasi-threads is not available with components");
347 }
348
349 if self.run.common.wasi.http == Some(false) {
350 bail!("support for wasi-http must be enabled for `serve` subcommand");
351 }
352
353 Ok(())
354 }
355
356 async fn serve(mut self) -> Result<()> {
357 use hyper::server::conn::http1;
358
359 let mut config = self
360 .run
361 .common
362 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
363 config.wasm_component_model(true);
364 config.async_support(true);
365
366 if self.run.common.wasm.timeout.is_some() {
367 config.epoch_interruption(true);
368 }
369
370 match self.run.profile {
371 Some(Profile::Native(s)) => {
372 config.profiler(s);
373 }
374 Some(Profile::Guest { .. }) => {
375 config.epoch_interruption(true);
376 }
377 None => {}
378 }
379
380 let engine = Engine::new(&config)?;
381 let mut linker = Linker::new(&engine);
382
383 self.add_to_linker(&mut linker)?;
384
385 let component = match self.run.load_module(&engine, &self.component)? {
386 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
387 RunTarget::Component(c) => c,
388 };
389
390 let instance = linker.instantiate_pre(&component)?;
391 #[cfg(feature = "component-model-async")]
392 let instance = match wasmtime_wasi_http::p3::bindings::ProxyIndices::new(&instance) {
393 Ok(indices) => ProxyPre::P3(indices, instance),
394 Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
395 };
396 #[cfg(not(feature = "component-model-async"))]
397 let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
398
399 let shutdown = Arc::new(GracefulShutdown::default());
403 tokio::task::spawn({
404 let shutdown = shutdown.clone();
405 async move {
406 tokio::signal::ctrl_c().await.unwrap();
407 shutdown.requested.notify_one();
408 }
409 });
410 if let Some(addr) = self.shutdown_addr {
411 let listener = tokio::net::TcpListener::bind(addr).await?;
412 eprintln!(
413 "Listening for shutdown on tcp://{}/",
414 listener.local_addr()?
415 );
416 let shutdown = shutdown.clone();
417 tokio::task::spawn(async move {
418 let _ = listener.accept().await;
419 shutdown.requested.notify_one();
420 });
421 }
422
423 let socket = match &self.addr {
424 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
425 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
426 };
427 socket.set_reuseaddr(!cfg!(windows))?;
436 socket.bind(self.addr)?;
437 let listener = socket.listen(100)?;
438
439 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
440
441 log::info!("Listening on {}", self.addr);
442
443 let handler = ProxyHandler::new(self, engine, instance);
444
445 loop {
446 let (stream, _) = tokio::select! {
450 _ = shutdown.requested.notified() => break,
451 v = listener.accept() => v?,
452 };
453 let comp = component.clone();
454 let stream = TokioIo::new(stream);
455 let h = handler.clone();
456 let shutdown_guard = shutdown.clone().increment();
457 tokio::task::spawn(async move {
458 if let Err(e) = http1::Builder::new()
459 .keep_alive(true)
460 .serve_connection(
461 stream,
462 hyper::service::service_fn(move |req| {
463 let comp = comp.clone();
464 let h = h.clone();
465 async move {
466 use http_body_util::{BodyExt, Full};
467 match handle_request(h, req, comp).await {
468 Ok(r) => Ok::<_, Infallible>(r),
469 Err(e) => {
470 eprintln!("error: {e:?}");
471 let error_html = "\
472<!doctype html>
473<html>
474<head>
475 <title>500 Internal Server Error</title>
476</head>
477<body>
478 <center>
479 <h1>500 Internal Server Error</h1>
480 <hr>
481 wasmtime
482 </center>
483</body>
484</html>";
485 Ok(Response::builder()
486 .status(StatusCode::INTERNAL_SERVER_ERROR)
487 .header("Content-Type", "text/html; charset=UTF-8")
488 .body(
489 Full::new(bytes::Bytes::from(error_html))
490 .map_err(|_| unreachable!())
491 .boxed(),
492 )
493 .unwrap())
494 }
495 }
496 }
497 }),
498 )
499 .await
500 {
501 eprintln!("error: {e:?}");
502 }
503 drop(shutdown_guard);
504 });
505 }
506
507 if shutdown.close() {
513 return Ok(());
514 }
515 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
516 tokio::select! {
517 _ = tokio::signal::ctrl_c() => {}
518 _ = shutdown.complete.notified() => {}
519 }
520
521 Ok(())
522 }
523}
524
525#[derive(Default)]
527struct GracefulShutdown {
528 requested: Notify,
530 complete: Notify,
533 state: Mutex<GracefulShutdownState>,
535}
536
537#[derive(Default)]
538struct GracefulShutdownState {
539 active_tasks: u32,
540 notify_when_done: bool,
541}
542
543impl GracefulShutdown {
544 fn increment(self: Arc<Self>) -> impl Drop {
546 struct Guard(Arc<GracefulShutdown>);
547
548 let mut state = self.state.lock().unwrap();
549 assert!(!state.notify_when_done);
550 state.active_tasks += 1;
551 drop(state);
552
553 return Guard(self);
554
555 impl Drop for Guard {
556 fn drop(&mut self) {
557 let mut state = self.0.state.lock().unwrap();
558 state.active_tasks -= 1;
559 if state.notify_when_done && state.active_tasks == 0 {
560 self.0.complete.notify_one();
561 }
562 }
563 }
564 }
565
566 fn close(&self) -> bool {
569 let mut state = self.state.lock().unwrap();
570 state.notify_when_done = true;
571 state.active_tasks == 0
572 }
573}
574
575const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
579
580struct EpochThread {
581 shutdown: Arc<AtomicBool>,
582 handle: Option<std::thread::JoinHandle<()>>,
583}
584
585impl EpochThread {
586 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
587 let shutdown = Arc::new(AtomicBool::new(false));
588 let handle = {
589 let shutdown = Arc::clone(&shutdown);
590 let handle = std::thread::spawn(move || {
591 while !shutdown.load(Ordering::Relaxed) {
592 std::thread::sleep(interval);
593 engine.increment_epoch();
594 }
595 });
596 Some(handle)
597 };
598
599 EpochThread { shutdown, handle }
600 }
601}
602
603impl Drop for EpochThread {
604 fn drop(&mut self) {
605 if let Some(handle) = self.handle.take() {
606 self.shutdown.store(true, Ordering::Relaxed);
607 handle.join().unwrap();
608 }
609 }
610}
611
612type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
613
614fn setup_epoch_handler(
615 cmd: &ServeCommand,
616 store: &mut Store<Host>,
617 component: Component,
618) -> Result<(WriteProfile, Option<EpochThread>)> {
619 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
621 #[cfg(feature = "profiling")]
622 return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
623 #[cfg(not(feature = "profiling"))]
624 {
625 let _ = (path, interval);
626 bail!("support for profiling disabled at compile time!");
627 }
628 }
629
630 let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
632 let start = Instant::now();
633 store.epoch_deadline_callback(move |_store| {
634 if start.elapsed() > timeout {
635 bail!("Timeout expired");
636 }
637 Ok(UpdateDeadline::Continue(1))
638 });
639 store.set_epoch_deadline(1);
640 let engine = store.engine().clone();
641 Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
642 } else {
643 None
644 };
645
646 Ok((Box::new(|_store| {}), epoch_thread))
647}
648
649#[cfg(feature = "profiling")]
650fn setup_guest_profiler(
651 cmd: &ServeCommand,
652 store: &mut Store<Host>,
653 path: String,
654 interval: Duration,
655 component: Component,
656) -> Result<(WriteProfile, Option<EpochThread>)> {
657 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
658
659 let module_name = "<main>";
660
661 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
662 module_name,
663 interval,
664 component,
665 std::iter::empty(),
666 )));
667
668 fn sample(
669 mut store: StoreContextMut<Host>,
670 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
671 ) {
672 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
673 f(
674 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
675 store.as_context(),
676 );
677 store.data_mut().guest_profiler = Some(profiler);
678 }
679
680 store.call_hook(|store, kind| {
682 sample(store, |profiler, store| profiler.call_hook(store, kind));
683 Ok(())
684 });
685
686 let start = Instant::now();
687 let timeout = cmd.run.common.wasm.timeout;
688 store.epoch_deadline_callback(move |store| {
689 sample(store, |profiler, store| {
690 profiler.sample(store, std::time::Duration::ZERO)
691 });
692
693 if let Some(timeout) = timeout {
697 if start.elapsed() > timeout {
698 bail!("Timeout expired");
699 }
700 }
701
702 Ok(UpdateDeadline::Continue(1))
703 });
704
705 store.set_epoch_deadline(1);
706 let engine = store.engine().clone();
707 let epoch_thread = Some(EpochThread::spawn(interval, engine));
708
709 let write_profile = Box::new(move |store: &mut Store<Host>| {
710 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
711 .expect("profiling doesn't support threads yet");
712 if let Err(e) = std::fs::File::create(&path)
713 .map_err(anyhow::Error::new)
714 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
715 {
716 eprintln!("failed writing profile at {path}: {e:#}");
717 } else {
718 eprintln!();
719 eprintln!("Profile written to: {path}");
720 eprintln!("View this profile at https://profiler.firefox.com/.");
721 }
722 });
723
724 Ok((write_profile, epoch_thread))
725}
726
727struct ProxyHandlerInner {
728 cmd: ServeCommand,
729 engine: Engine,
730 instance_pre: ProxyPre,
731 next_id: AtomicU64,
732}
733
734enum ProxyPre {
735 P2(p2::ProxyPre<Host>),
736 #[cfg(feature = "component-model-async")]
737 P3(
738 wasmtime_wasi_http::p3::bindings::ProxyIndices,
739 wasmtime::component::InstancePre<Host>,
740 ),
741}
742
743impl ProxyPre {
744 async fn instantiate(&self, store: &mut Store<Host>) -> Result<Proxy> {
745 Ok(match self {
746 ProxyPre::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
747 #[cfg(feature = "component-model-async")]
748 ProxyPre::P3(indices, pre) => {
749 let instance = pre.instantiate_async(&mut *store).await?;
750 let proxy = indices.load(&mut *store, &instance)?;
751 Proxy::P3(proxy, instance)
752 }
753 })
754 }
755}
756
757enum Proxy {
758 P2(p2::Proxy),
759 #[cfg(feature = "component-model-async")]
760 P3(
761 wasmtime_wasi_http::p3::bindings::Proxy,
762 wasmtime::component::Instance,
763 ),
764}
765
766impl ProxyHandlerInner {
767 fn next_req_id(&self) -> u64 {
768 self.next_id.fetch_add(1, Ordering::Relaxed)
769 }
770}
771
772#[derive(Clone)]
773struct ProxyHandler(Arc<ProxyHandlerInner>);
774
775impl ProxyHandler {
776 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre) -> Self {
777 Self(Arc::new(ProxyHandlerInner {
778 cmd,
779 engine,
780 instance_pre,
781 next_id: AtomicU64::from(0),
782 }))
783 }
784}
785
786type Request = hyper::Request<hyper::body::Incoming>;
787
788async fn handle_request(
789 ProxyHandler(inner): ProxyHandler,
790 req: Request,
791 component: Component,
792) -> Result<hyper::Response<BoxBody<Bytes, anyhow::Error>>> {
793 let (sender, receiver) = tokio::sync::oneshot::channel();
794
795 let req_id = inner.next_req_id();
796
797 log::info!(
798 "Request {req_id} handling {} to {}",
799 req.method(),
800 req.uri()
801 );
802
803 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
804
805 let (write_profile, epoch_thread) =
806 setup_epoch_handler(&inner.cmd, &mut store, component.clone())?;
807
808 match inner.instance_pre.instantiate(&mut store).await? {
809 Proxy::P2(proxy) => {
810 let req = store
811 .data_mut()
812 .new_incoming_request(p2::http::types::Scheme::Http, req)?;
813 let out = store.data_mut().new_response_outparam(sender)?;
814 let task = tokio::task::spawn(async move {
815 if let Err(e) = proxy
816 .wasi_http_incoming_handler()
817 .call_handle(&mut store, req, out)
818 .await
819 {
820 log::error!("[{req_id}] :: {e:?}");
821 return Err(e);
822 }
823
824 write_profile(&mut store);
825 drop(epoch_thread);
826
827 Ok(())
828 });
829
830 let result = match receiver.await {
831 Ok(Ok(resp)) => resp,
832 Ok(Err(e)) => bail!(e),
833 Err(_) => {
834 let e = match task.await {
842 Ok(Ok(())) => {
843 bail!("guest never invoked `response-outparam::set` method")
844 }
845 Ok(Err(e)) => e,
846 Err(e) => e.into(),
847 };
848 bail!(e.context("guest never invoked `response-outparam::set` method"))
849 }
850 };
851
852 Ok(result.map(|body| body.map_err(|e| e.into()).boxed()))
853 }
854 #[cfg(feature = "component-model-async")]
855 Proxy::P3(proxy, instance) => {
856 use wasmtime_wasi_http::p3::bindings::http::types::{ErrorCode, Request};
857
858 let (tx, rx) = tokio::sync::oneshot::channel();
859
860 tokio::task::spawn(async move {
861 let guest_result = instance
862 .run_concurrent(&mut store, async move |store| {
863 let (req, body) = req.into_parts();
864 let body = body.map_err(ErrorCode::from_hyper_request_error);
865 let req = http::Request::from_parts(req, body);
866 let (request, request_io_result) = Request::from_http(req);
867 let (res, task) = proxy.handle(store, request).await??;
868 let res =
869 store.with(|mut store| res.into_http(&mut store, request_io_result))?;
870
871 _ = tx.send(res);
872
873 task.block(store).await;
874 anyhow::Ok(())
875 })
876 .await?;
877 if let Err(e) = guest_result {
878 log::error!("[{req_id}] :: {e:?}");
879 return Err(e);
880 }
881
882 write_profile(&mut store);
883 drop(epoch_thread);
884
885 anyhow::Ok(())
886 });
887 Ok(rx.await?.map(|body| body.map_err(|err| err.into()).boxed()))
888 }
889 }
890}
891
892#[derive(Clone)]
893enum Output {
894 Stdout,
895 Stderr,
896}
897
898impl Output {
899 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
900 use std::io::Write;
901
902 match self {
903 Output::Stdout => std::io::stdout().write_all(buf),
904 Output::Stderr => std::io::stderr().write_all(buf),
905 }
906 }
907}
908
909#[derive(Clone)]
910struct LogStream {
911 output: Output,
912 state: Arc<LogStreamState>,
913}
914
915struct LogStreamState {
916 prefix: String,
917 needs_prefix_on_next_write: AtomicBool,
918}
919
920impl LogStream {
921 fn new(prefix: String, output: Output) -> LogStream {
922 LogStream {
923 output,
924 state: Arc::new(LogStreamState {
925 prefix,
926 needs_prefix_on_next_write: AtomicBool::new(true),
927 }),
928 }
929 }
930
931 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
932 while !bytes.is_empty() {
933 if self
934 .state
935 .needs_prefix_on_next_write
936 .load(Ordering::Relaxed)
937 {
938 self.output.write_all(self.state.prefix.as_bytes())?;
939 self.state
940 .needs_prefix_on_next_write
941 .store(false, Ordering::Relaxed);
942 }
943 match bytes.iter().position(|b| *b == b'\n') {
944 Some(i) => {
945 let (a, b) = bytes.split_at(i + 1);
946 bytes = b;
947 self.output.write_all(a)?;
948 self.state
949 .needs_prefix_on_next_write
950 .store(true, Ordering::Relaxed);
951 }
952 None => {
953 self.output.write_all(bytes)?;
954 break;
955 }
956 }
957 }
958
959 Ok(())
960 }
961}
962
963impl wasmtime_wasi::cli::StdoutStream for LogStream {
964 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
965 Box::new(self.clone())
966 }
967 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
968 Box::new(self.clone())
969 }
970}
971
972impl wasmtime_wasi::cli::IsTerminal for LogStream {
973 fn is_terminal(&self) -> bool {
974 match &self.output {
975 Output::Stdout => std::io::stdout().is_terminal(),
976 Output::Stderr => std::io::stderr().is_terminal(),
977 }
978 }
979}
980
981impl wasmtime_wasi::p2::OutputStream for LogStream {
982 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
983 self.write_all(&bytes)
984 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
985 Ok(())
986 }
987
988 fn flush(&mut self) -> StreamResult<()> {
989 Ok(())
990 }
991
992 fn check_write(&mut self) -> StreamResult<usize> {
993 Ok(1024 * 1024)
994 }
995}
996
997#[async_trait::async_trait]
998impl wasmtime_wasi::p2::Pollable for LogStream {
999 async fn ready(&mut self) {}
1000}
1001
1002impl AsyncWrite for LogStream {
1003 fn poll_write(
1004 mut self: Pin<&mut Self>,
1005 _cx: &mut Context<'_>,
1006 buf: &[u8],
1007 ) -> Poll<io::Result<usize>> {
1008 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1009 }
1010 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1011 Poll::Ready(Ok(()))
1012 }
1013 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1014 Poll::Ready(Ok(()))
1015 }
1016}
1017
1018fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1040 use wasmtime::{Config, Memory, MemoryType};
1041 const BITS_TO_TEST: u32 = 42;
1042 let mut config = Config::new();
1043 config.wasm_memory64(true);
1044 config.memory_reservation(1 << BITS_TO_TEST);
1045 let engine = Engine::new(&config)?;
1046 let mut store = Store::new(&engine, ());
1047 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1050 if Memory::new(&mut store, ty).is_ok() {
1051 Ok(Some(true))
1052 } else {
1053 Ok(None)
1054 }
1055}