1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::{
6 path::PathBuf,
7 sync::{
8 atomic::{AtomicBool, AtomicU64, Ordering},
9 Arc,
10 },
11};
12use wasmtime::component::Linker;
13use wasmtime::{Config, Engine, Memory, MemoryType, Store, StoreLimits};
14use wasmtime_wasi::{StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
15use wasmtime_wasi_http::bindings::http::types::Scheme;
16use wasmtime_wasi_http::bindings::ProxyPre;
17use wasmtime_wasi_http::io::TokioIo;
18use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};
19
20#[cfg(feature = "wasi-keyvalue")]
21use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
22#[cfg(feature = "wasi-nn")]
23use wasmtime_wasi_nn::wit::WasiNnCtx;
24#[cfg(feature = "wasi-runtime-config")]
25use wasmtime_wasi_runtime_config::{WasiRuntimeConfig, WasiRuntimeConfigVariables};
26
27struct Host {
28 table: wasmtime::component::ResourceTable,
29 ctx: WasiCtx,
30 http: WasiHttpCtx,
31
32 limits: StoreLimits,
33
34 #[cfg(feature = "wasi-nn")]
35 nn: Option<WasiNnCtx>,
36
37 #[cfg(feature = "wasi-runtime-config")]
38 wasi_runtime_config: Option<WasiRuntimeConfigVariables>,
39
40 #[cfg(feature = "wasi-keyvalue")]
41 wasi_keyvalue: Option<WasiKeyValueCtx>,
42}
43
44impl WasiView for Host {
45 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
46 &mut self.table
47 }
48
49 fn ctx(&mut self) -> &mut WasiCtx {
50 &mut self.ctx
51 }
52}
53
54impl WasiHttpView for Host {
55 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
56 &mut self.table
57 }
58
59 fn ctx(&mut self) -> &mut WasiHttpCtx {
60 &mut self.http
61 }
62}
63
64const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
65 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
66 8080,
67);
68
69#[derive(Parser, PartialEq)]
71pub struct ServeCommand {
72 #[command(flatten)]
73 run: RunCommon,
74
75 #[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR )]
77 addr: SocketAddr,
78
79 #[arg(value_name = "WASM", required = true)]
81 component: PathBuf,
82}
83
84impl ServeCommand {
85 pub fn execute(mut self) -> Result<()> {
87 self.run.common.init_logging()?;
88
89 if let Some(Profile::Guest { .. }) = &self.run.profile {
92 bail!("Cannot use the guest profiler with components");
93 }
94
95 if self.run.common.wasi.nn == Some(true) {
96 #[cfg(not(feature = "wasi-nn"))]
97 {
98 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
99 }
100 }
101
102 if self.run.common.wasi.threads == Some(true) {
103 bail!("wasi-threads does not support components yet")
104 }
105
106 if self.run.common.wasi.http.replace(true) == Some(false) {
109 bail!("wasi-http is required for the serve command, and must not be disabled");
110 }
111 if self.run.common.wasm.component_model.replace(true) == Some(false) {
112 bail!("components are required for the serve command, and must not be disabled");
113 }
114
115 let runtime = tokio::runtime::Builder::new_multi_thread()
116 .enable_time()
117 .enable_io()
118 .build()?;
119
120 runtime.block_on(async move {
121 tokio::select! {
122 _ = tokio::signal::ctrl_c() => {
123 Ok::<_, anyhow::Error>(())
124 }
125
126 res = self.serve() => {
127 res
128 }
129 }
130 })?;
131
132 Ok(())
133 }
134
135 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
136 let mut builder = WasiCtxBuilder::new();
137 self.run.configure_wasip2(&mut builder)?;
138
139 builder.env("REQUEST_ID", req_id.to_string());
140
141 builder.stdout(LogStream::new(
142 format!("stdout [{req_id}] :: "),
143 Output::Stdout,
144 ));
145
146 builder.stderr(LogStream::new(
147 format!("stderr [{req_id}] :: "),
148 Output::Stderr,
149 ));
150
151 let mut host = Host {
152 table: wasmtime::component::ResourceTable::new(),
153 ctx: builder.build(),
154 http: WasiHttpCtx::new(),
155
156 limits: StoreLimits::default(),
157
158 #[cfg(feature = "wasi-nn")]
159 nn: None,
160 #[cfg(feature = "wasi-runtime-config")]
161 wasi_runtime_config: None,
162 #[cfg(feature = "wasi-keyvalue")]
163 wasi_keyvalue: None,
164 };
165
166 if self.run.common.wasi.nn == Some(true) {
167 #[cfg(feature = "wasi-nn")]
168 {
169 let graphs = self
170 .run
171 .common
172 .wasi
173 .nn_graph
174 .iter()
175 .map(|g| (g.format.clone(), g.dir.clone()))
176 .collect::<Vec<_>>();
177 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
178 host.nn.replace(WasiNnCtx::new(backends, registry));
179 }
180 }
181
182 if self.run.common.wasi.runtime_config == Some(true) {
183 #[cfg(feature = "wasi-runtime-config")]
184 {
185 let vars = WasiRuntimeConfigVariables::from_iter(
186 self.run
187 .common
188 .wasi
189 .runtime_config_var
190 .iter()
191 .map(|v| (v.key.clone(), v.value.clone())),
192 );
193 host.wasi_runtime_config.replace(vars);
194 }
195 }
196
197 if self.run.common.wasi.keyvalue == Some(true) {
198 #[cfg(feature = "wasi-keyvalue")]
199 {
200 let ctx = WasiKeyValueCtxBuilder::new()
201 .in_memory_data(
202 self.run
203 .common
204 .wasi
205 .keyvalue_in_memory_data
206 .iter()
207 .map(|v| (v.key.clone(), v.value.clone())),
208 )
209 .build();
210 host.wasi_keyvalue.replace(ctx);
211 }
212 }
213
214 let mut store = Store::new(engine, host);
215
216 if self.run.common.wasm.timeout.is_some() {
217 store.set_epoch_deadline(u64::from(EPOCH_PRECISION) + 1);
218 }
219
220 store.data_mut().limits = self.run.store_limits();
221 store.limiter(|t| &mut t.limits);
222
223 if let Some(fuel) = self.run.common.wasm.fuel {
226 store.set_fuel(fuel)?;
227 }
228
229 Ok(store)
230 }
231
232 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
233 let mut cli = self.run.common.wasi.cli;
234
235 if let Some(common) = self.run.common.wasi.common {
237 if cli.is_some() {
238 bail!(
239 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
240 );
241 } else {
242 cli = Some(common);
245 }
246 }
247
248 if cli == Some(true) {
257 wasmtime_wasi::add_to_linker_async(linker)?;
258 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
259 } else {
260 wasmtime_wasi_http::add_to_linker_async(linker)?;
261 }
262
263 if self.run.common.wasi.nn == Some(true) {
264 #[cfg(not(feature = "wasi-nn"))]
265 {
266 bail!("support for wasi-nn was disabled at compile time");
267 }
268 #[cfg(feature = "wasi-nn")]
269 {
270 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
271 let ctx = h.nn.as_mut().unwrap();
272 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
273 })?;
274 }
275 }
276
277 if self.run.common.wasi.runtime_config == Some(true) {
278 #[cfg(not(feature = "wasi-runtime-config"))]
279 {
280 bail!("support for wasi-runtime-config was disabled at compile time");
281 }
282 #[cfg(feature = "wasi-runtime-config")]
283 {
284 wasmtime_wasi_runtime_config::add_to_linker(linker, |h| {
285 WasiRuntimeConfig::from(h.wasi_runtime_config.as_ref().unwrap())
286 })?;
287 }
288 }
289
290 if self.run.common.wasi.keyvalue == Some(true) {
291 #[cfg(not(feature = "wasi-keyvalue"))]
292 {
293 bail!("support for wasi-keyvalue was disabled at compile time");
294 }
295 #[cfg(feature = "wasi-keyvalue")]
296 {
297 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
298 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
299 })?;
300 }
301 }
302
303 if self.run.common.wasi.threads == Some(true) {
304 bail!("support for wasi-threads is not available with components");
305 }
306
307 if self.run.common.wasi.http == Some(false) {
308 bail!("support for wasi-http must be enabled for `serve` subcommand");
309 }
310
311 Ok(())
312 }
313
314 async fn serve(mut self) -> Result<()> {
315 use hyper::server::conn::http1;
316
317 let mut config = self
318 .run
319 .common
320 .config(None, use_pooling_allocator_by_default().unwrap_or(None))?;
321 config.wasm_component_model(true);
322 config.async_support(true);
323
324 if self.run.common.wasm.timeout.is_some() {
325 config.epoch_interruption(true);
326 }
327
328 match self.run.profile {
329 Some(Profile::Native(s)) => {
330 config.profiler(s);
331 }
332
333 Some(Profile::Guest { .. }) => unreachable!(),
335
336 None => {}
337 }
338
339 let engine = Engine::new(&config)?;
340 let mut linker = Linker::new(&engine);
341
342 self.add_to_linker(&mut linker)?;
343
344 let component = match self.run.load_module(&engine, &self.component)? {
345 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
346 RunTarget::Component(c) => c,
347 };
348
349 let instance = linker.instantiate_pre(&component)?;
350 let instance = ProxyPre::new(instance)?;
351
352 let socket = match &self.addr {
353 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
354 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
355 };
356 socket.set_reuseaddr(!cfg!(windows))?;
365 socket.bind(self.addr)?;
366 let listener = socket.listen(100)?;
367
368 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
369
370 let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout {
371 Some(EpochThread::spawn(
372 timeout / EPOCH_PRECISION,
373 engine.clone(),
374 ))
375 } else {
376 None
377 };
378
379 log::info!("Listening on {}", self.addr);
380
381 let handler = ProxyHandler::new(self, engine, instance);
382
383 loop {
384 let (stream, _) = listener.accept().await?;
385 let stream = TokioIo::new(stream);
386 let h = handler.clone();
387 tokio::task::spawn(async {
388 if let Err(e) = http1::Builder::new()
389 .keep_alive(true)
390 .serve_connection(
391 stream,
392 hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
393 )
394 .await
395 {
396 eprintln!("error: {e:?}");
397 }
398 });
399 }
400 }
401}
402
403const EPOCH_PRECISION: u32 = 10;
408
409struct EpochThread {
410 shutdown: Arc<AtomicBool>,
411 handle: Option<std::thread::JoinHandle<()>>,
412}
413
414impl EpochThread {
415 fn spawn(timeout: std::time::Duration, engine: Engine) -> Self {
416 let shutdown = Arc::new(AtomicBool::new(false));
417 let handle = {
418 let shutdown = Arc::clone(&shutdown);
419 let handle = std::thread::spawn(move || {
420 while !shutdown.load(Ordering::Relaxed) {
421 std::thread::sleep(timeout);
422 engine.increment_epoch();
423 }
424 });
425 Some(handle)
426 };
427
428 EpochThread { shutdown, handle }
429 }
430}
431
432impl Drop for EpochThread {
433 fn drop(&mut self) {
434 if let Some(handle) = self.handle.take() {
435 self.shutdown.store(true, Ordering::Relaxed);
436 handle.join().unwrap();
437 }
438 }
439}
440
441struct ProxyHandlerInner {
442 cmd: ServeCommand,
443 engine: Engine,
444 instance_pre: ProxyPre<Host>,
445 next_id: AtomicU64,
446}
447
448impl ProxyHandlerInner {
449 fn next_req_id(&self) -> u64 {
450 self.next_id.fetch_add(1, Ordering::Relaxed)
451 }
452}
453
454#[derive(Clone)]
455struct ProxyHandler(Arc<ProxyHandlerInner>);
456
457impl ProxyHandler {
458 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
459 Self(Arc::new(ProxyHandlerInner {
460 cmd,
461 engine,
462 instance_pre,
463 next_id: AtomicU64::from(0),
464 }))
465 }
466}
467
468type Request = hyper::Request<hyper::body::Incoming>;
469
470async fn handle_request(
471 ProxyHandler(inner): ProxyHandler,
472 req: Request,
473) -> Result<hyper::Response<HyperOutgoingBody>> {
474 let (sender, receiver) = tokio::sync::oneshot::channel();
475
476 let req_id = inner.next_req_id();
477
478 log::info!(
479 "Request {req_id} handling {} to {}",
480 req.method(),
481 req.uri()
482 );
483
484 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
485
486 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
487 let out = store.data_mut().new_response_outparam(sender)?;
488 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
489
490 let task = tokio::task::spawn(async move {
491 if let Err(e) = proxy
492 .wasi_http_incoming_handler()
493 .call_handle(store, req, out)
494 .await
495 {
496 log::error!("[{req_id}] :: {:#?}", e);
497 return Err(e);
498 }
499
500 Ok(())
501 });
502
503 match receiver.await {
504 Ok(Ok(resp)) => Ok(resp),
505 Ok(Err(e)) => Err(e.into()),
506 Err(_) => {
507 let e = match task.await {
515 Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"),
516 Err(e) => e.into(),
517 };
518 bail!("guest never invoked `response-outparam::set` method: {e:?}")
519 }
520 }
521}
522
523#[derive(Clone)]
524enum Output {
525 Stdout,
526 Stderr,
527}
528
529impl Output {
530 fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
531 use std::io::Write;
532
533 match self {
534 Output::Stdout => std::io::stdout().write_all(buf),
535 Output::Stderr => std::io::stderr().write_all(buf),
536 }
537 .map_err(|e| anyhow!(e))
538 }
539}
540
541#[derive(Clone)]
542struct LogStream {
543 prefix: String,
544 output: Output,
545 needs_prefix_on_next_write: bool,
546}
547
548impl LogStream {
549 fn new(prefix: String, output: Output) -> LogStream {
550 LogStream {
551 prefix,
552 output,
553 needs_prefix_on_next_write: true,
554 }
555 }
556}
557
558impl wasmtime_wasi::StdoutStream for LogStream {
559 fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
560 Box::new(self.clone())
561 }
562
563 fn isatty(&self) -> bool {
564 use std::io::IsTerminal;
565
566 match &self.output {
567 Output::Stdout => std::io::stdout().is_terminal(),
568 Output::Stderr => std::io::stderr().is_terminal(),
569 }
570 }
571}
572
573impl wasmtime_wasi::HostOutputStream for LogStream {
574 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
575 let mut bytes = &bytes[..];
576
577 while !bytes.is_empty() {
578 if self.needs_prefix_on_next_write {
579 self.output
580 .write_all(self.prefix.as_bytes())
581 .map_err(StreamError::LastOperationFailed)?;
582 self.needs_prefix_on_next_write = false;
583 }
584 match bytes.iter().position(|b| *b == b'\n') {
585 Some(i) => {
586 let (a, b) = bytes.split_at(i + 1);
587 bytes = b;
588 self.output
589 .write_all(a)
590 .map_err(StreamError::LastOperationFailed)?;
591 self.needs_prefix_on_next_write = true;
592 }
593 None => {
594 self.output
595 .write_all(bytes)
596 .map_err(StreamError::LastOperationFailed)?;
597 break;
598 }
599 }
600 }
601
602 Ok(())
603 }
604
605 fn flush(&mut self) -> StreamResult<()> {
606 Ok(())
607 }
608
609 fn check_write(&mut self) -> StreamResult<usize> {
610 Ok(1024 * 1024)
611 }
612}
613
614#[async_trait::async_trait]
615impl wasmtime_wasi::Subscribe for LogStream {
616 async fn ready(&mut self) {}
617}
618
619fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
641 const BITS_TO_TEST: u32 = 42;
642 let mut config = Config::new();
643 config.wasm_memory64(true);
644 config.static_memory_maximum_size(1 << BITS_TO_TEST);
645 let engine = Engine::new(&config)?;
646 let mut store = Store::new(&engine, ());
647 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
650 if Memory::new(&mut store, ty).is_ok() {
651 Ok(Some(true))
652 } else {
653 Ok(None)
654 }
655}