1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::{
6 path::PathBuf,
7 sync::{
8 atomic::{AtomicBool, AtomicU64, Ordering},
9 Arc,
10 },
11};
12use wasmtime::component::Linker;
13use wasmtime::{Config, Engine, Memory, MemoryType, Store, StoreLimits};
14use wasmtime_wasi::{StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
15use wasmtime_wasi_http::bindings::http::types::Scheme;
16use wasmtime_wasi_http::bindings::ProxyPre;
17use wasmtime_wasi_http::io::TokioIo;
18use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};
19
20#[cfg(feature = "wasi-keyvalue")]
21use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
22#[cfg(feature = "wasi-nn")]
23use wasmtime_wasi_nn::wit::WasiNnCtx;
24#[cfg(feature = "wasi-runtime-config")]
25use wasmtime_wasi_runtime_config::{WasiRuntimeConfig, WasiRuntimeConfigVariables};
26
27struct Host {
28 table: wasmtime::component::ResourceTable,
29 ctx: WasiCtx,
30 http: WasiHttpCtx,
31
32 limits: StoreLimits,
33
34 #[cfg(feature = "wasi-nn")]
35 nn: Option<WasiNnCtx>,
36
37 #[cfg(feature = "wasi-runtime-config")]
38 wasi_runtime_config: Option<WasiRuntimeConfigVariables>,
39
40 #[cfg(feature = "wasi-keyvalue")]
41 wasi_keyvalue: Option<WasiKeyValueCtx>,
42}
43
44impl WasiView for Host {
45 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
46 &mut self.table
47 }
48
49 fn ctx(&mut self) -> &mut WasiCtx {
50 &mut self.ctx
51 }
52}
53
54impl WasiHttpView for Host {
55 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
56 &mut self.table
57 }
58
59 fn ctx(&mut self) -> &mut WasiHttpCtx {
60 &mut self.http
61 }
62}
63
64const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
65 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
66 8080,
67);
68
69#[derive(Parser, PartialEq)]
71pub struct ServeCommand {
72 #[command(flatten)]
73 run: RunCommon,
74
75 #[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR )]
77 addr: SocketAddr,
78
79 #[arg(value_name = "WASM", required = true)]
81 component: PathBuf,
82}
83
84impl ServeCommand {
85 pub fn execute(mut self) -> Result<()> {
87 self.run.common.init_logging()?;
88
89 if let Some(Profile::Guest { .. }) = &self.run.profile {
92 bail!("Cannot use the guest profiler with components");
93 }
94
95 if self.run.common.wasi.nn == Some(true) {
96 #[cfg(not(feature = "wasi-nn"))]
97 {
98 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
99 }
100 }
101
102 if self.run.common.wasi.threads == Some(true) {
103 bail!("wasi-threads does not support components yet")
104 }
105
106 if self.run.common.wasi.http.replace(true) == Some(false) {
109 bail!("wasi-http is required for the serve command, and must not be disabled");
110 }
111 if self.run.common.wasm.component_model.replace(true) == Some(false) {
112 bail!("components are required for the serve command, and must not be disabled");
113 }
114
115 let runtime = tokio::runtime::Builder::new_multi_thread()
116 .enable_time()
117 .enable_io()
118 .build()?;
119
120 runtime.block_on(async move {
121 tokio::select! {
122 _ = tokio::signal::ctrl_c() => {
123 Ok::<_, anyhow::Error>(())
124 }
125
126 res = self.serve() => {
127 res
128 }
129 }
130 })?;
131
132 Ok(())
133 }
134
135 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
136 let mut builder = WasiCtxBuilder::new();
137 self.run.configure_wasip2(&mut builder)?;
138
139 builder.env("REQUEST_ID", req_id.to_string());
140
141 builder.stdout(LogStream::new(
142 format!("stdout [{req_id}] :: "),
143 Output::Stdout,
144 ));
145
146 builder.stderr(LogStream::new(
147 format!("stderr [{req_id}] :: "),
148 Output::Stderr,
149 ));
150
151 let mut table = wasmtime::component::ResourceTable::new();
152 if let Some(max) = self.run.common.wasi.max_resources {
153 table.set_max_capacity(max);
154 }
155 let mut host = Host {
156 table,
157 ctx: builder.build(),
158 http: self.run.wasi_http_ctx()?,
159
160 limits: StoreLimits::default(),
161
162 #[cfg(feature = "wasi-nn")]
163 nn: None,
164 #[cfg(feature = "wasi-runtime-config")]
165 wasi_runtime_config: None,
166 #[cfg(feature = "wasi-keyvalue")]
167 wasi_keyvalue: None,
168 };
169
170 if self.run.common.wasi.nn == Some(true) {
171 #[cfg(feature = "wasi-nn")]
172 {
173 let graphs = self
174 .run
175 .common
176 .wasi
177 .nn_graph
178 .iter()
179 .map(|g| (g.format.clone(), g.dir.clone()))
180 .collect::<Vec<_>>();
181 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
182 host.nn.replace(WasiNnCtx::new(backends, registry));
183 }
184 }
185
186 if self.run.common.wasi.runtime_config == Some(true) {
187 #[cfg(feature = "wasi-runtime-config")]
188 {
189 let vars = WasiRuntimeConfigVariables::from_iter(
190 self.run
191 .common
192 .wasi
193 .runtime_config_var
194 .iter()
195 .map(|v| (v.key.clone(), v.value.clone())),
196 );
197 host.wasi_runtime_config.replace(vars);
198 }
199 }
200
201 if self.run.common.wasi.keyvalue == Some(true) {
202 #[cfg(feature = "wasi-keyvalue")]
203 {
204 let ctx = WasiKeyValueCtxBuilder::new()
205 .in_memory_data(
206 self.run
207 .common
208 .wasi
209 .keyvalue_in_memory_data
210 .iter()
211 .map(|v| (v.key.clone(), v.value.clone())),
212 )
213 .build();
214 host.wasi_keyvalue.replace(ctx);
215 }
216 }
217
218 let mut store = Store::new(engine, host);
219
220 if self.run.common.wasm.timeout.is_some() {
221 store.set_epoch_deadline(u64::from(EPOCH_PRECISION) + 1);
222 }
223 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
224 store.set_hostcall_fuel(fuel);
225 }
226
227 store.data_mut().limits = self.run.store_limits();
228 store.limiter(|t| &mut t.limits);
229
230 if let Some(fuel) = self.run.common.wasm.fuel {
233 store.set_fuel(fuel)?;
234 }
235
236 Ok(store)
237 }
238
239 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
240 let mut cli = self.run.common.wasi.cli;
241
242 if let Some(common) = self.run.common.wasi.common {
244 if cli.is_some() {
245 bail!(
246 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
247 );
248 } else {
249 cli = Some(common);
252 }
253 }
254
255 if cli == Some(true) {
264 wasmtime_wasi::add_to_linker_async(linker)?;
265 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
266 } else {
267 wasmtime_wasi_http::add_to_linker_async(linker)?;
268 }
269
270 if self.run.common.wasi.nn == Some(true) {
271 #[cfg(not(feature = "wasi-nn"))]
272 {
273 bail!("support for wasi-nn was disabled at compile time");
274 }
275 #[cfg(feature = "wasi-nn")]
276 {
277 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
278 let ctx = h.nn.as_mut().unwrap();
279 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
280 })?;
281 }
282 }
283
284 if self.run.common.wasi.runtime_config == Some(true) {
285 #[cfg(not(feature = "wasi-runtime-config"))]
286 {
287 bail!("support for wasi-runtime-config was disabled at compile time");
288 }
289 #[cfg(feature = "wasi-runtime-config")]
290 {
291 wasmtime_wasi_runtime_config::add_to_linker(linker, |h| {
292 WasiRuntimeConfig::from(h.wasi_runtime_config.as_ref().unwrap())
293 })?;
294 }
295 }
296
297 if self.run.common.wasi.keyvalue == Some(true) {
298 #[cfg(not(feature = "wasi-keyvalue"))]
299 {
300 bail!("support for wasi-keyvalue was disabled at compile time");
301 }
302 #[cfg(feature = "wasi-keyvalue")]
303 {
304 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
305 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
306 })?;
307 }
308 }
309
310 if self.run.common.wasi.threads == Some(true) {
311 bail!("support for wasi-threads is not available with components");
312 }
313
314 if self.run.common.wasi.http == Some(false) {
315 bail!("support for wasi-http must be enabled for `serve` subcommand");
316 }
317
318 Ok(())
319 }
320
321 async fn serve(mut self) -> Result<()> {
322 use hyper::server::conn::http1;
323
324 let mut config = self
325 .run
326 .common
327 .config(None, use_pooling_allocator_by_default().unwrap_or(None))?;
328 config.wasm_component_model(true);
329 config.async_support(true);
330
331 if self.run.common.wasm.timeout.is_some() {
332 config.epoch_interruption(true);
333 }
334
335 match self.run.profile {
336 Some(Profile::Native(s)) => {
337 config.profiler(s);
338 }
339
340 Some(Profile::Guest { .. }) => unreachable!(),
342
343 None => {}
344 }
345
346 let engine = Engine::new(&config)?;
347 let mut linker = Linker::new(&engine);
348
349 self.add_to_linker(&mut linker)?;
350
351 let component = match self.run.load_module(&engine, &self.component)? {
352 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
353 RunTarget::Component(c) => c,
354 };
355
356 let instance = linker.instantiate_pre(&component)?;
357 let instance = ProxyPre::new(instance)?;
358
359 let socket = match &self.addr {
360 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
361 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
362 };
363 socket.set_reuseaddr(!cfg!(windows))?;
372 socket.bind(self.addr)?;
373 let listener = socket.listen(100)?;
374
375 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
376
377 let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout {
378 Some(EpochThread::spawn(
379 timeout / EPOCH_PRECISION,
380 engine.clone(),
381 ))
382 } else {
383 None
384 };
385
386 log::info!("Listening on {}", self.addr);
387
388 let handler = ProxyHandler::new(self, engine, instance);
389
390 loop {
391 let (stream, _) = listener.accept().await?;
392 let stream = TokioIo::new(stream);
393 let h = handler.clone();
394 tokio::task::spawn(async {
395 if let Err(e) = http1::Builder::new()
396 .keep_alive(true)
397 .serve_connection(
398 stream,
399 hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
400 )
401 .await
402 {
403 eprintln!("error: {e:?}");
404 }
405 });
406 }
407 }
408}
409
410const EPOCH_PRECISION: u32 = 10;
415
416struct EpochThread {
417 shutdown: Arc<AtomicBool>,
418 handle: Option<std::thread::JoinHandle<()>>,
419}
420
421impl EpochThread {
422 fn spawn(timeout: std::time::Duration, engine: Engine) -> Self {
423 let shutdown = Arc::new(AtomicBool::new(false));
424 let handle = {
425 let shutdown = Arc::clone(&shutdown);
426 let handle = std::thread::spawn(move || {
427 while !shutdown.load(Ordering::Relaxed) {
428 std::thread::sleep(timeout);
429 engine.increment_epoch();
430 }
431 });
432 Some(handle)
433 };
434
435 EpochThread { shutdown, handle }
436 }
437}
438
439impl Drop for EpochThread {
440 fn drop(&mut self) {
441 if let Some(handle) = self.handle.take() {
442 self.shutdown.store(true, Ordering::Relaxed);
443 handle.join().unwrap();
444 }
445 }
446}
447
448struct ProxyHandlerInner {
449 cmd: ServeCommand,
450 engine: Engine,
451 instance_pre: ProxyPre<Host>,
452 next_id: AtomicU64,
453}
454
455impl ProxyHandlerInner {
456 fn next_req_id(&self) -> u64 {
457 self.next_id.fetch_add(1, Ordering::Relaxed)
458 }
459}
460
461#[derive(Clone)]
462struct ProxyHandler(Arc<ProxyHandlerInner>);
463
464impl ProxyHandler {
465 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
466 Self(Arc::new(ProxyHandlerInner {
467 cmd,
468 engine,
469 instance_pre,
470 next_id: AtomicU64::from(0),
471 }))
472 }
473}
474
475type Request = hyper::Request<hyper::body::Incoming>;
476
477async fn handle_request(
478 ProxyHandler(inner): ProxyHandler,
479 req: Request,
480) -> Result<hyper::Response<HyperOutgoingBody>> {
481 let (sender, receiver) = tokio::sync::oneshot::channel();
482
483 let req_id = inner.next_req_id();
484
485 log::info!(
486 "Request {req_id} handling {} to {}",
487 req.method(),
488 req.uri()
489 );
490
491 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
492
493 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
494 let out = store.data_mut().new_response_outparam(sender)?;
495 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
496
497 let task = tokio::task::spawn(async move {
498 if let Err(e) = proxy
499 .wasi_http_incoming_handler()
500 .call_handle(store, req, out)
501 .await
502 {
503 log::error!("[{req_id}] :: {:#?}", e);
504 return Err(e);
505 }
506
507 Ok(())
508 });
509
510 match receiver.await {
511 Ok(Ok(resp)) => Ok(resp),
512 Ok(Err(e)) => Err(e.into()),
513 Err(_) => {
514 let e = match task.await {
522 Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"),
523 Err(e) => e.into(),
524 };
525 bail!("guest never invoked `response-outparam::set` method: {e:?}")
526 }
527 }
528}
529
530#[derive(Clone)]
531enum Output {
532 Stdout,
533 Stderr,
534}
535
536impl Output {
537 fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
538 use std::io::Write;
539
540 match self {
541 Output::Stdout => std::io::stdout().write_all(buf),
542 Output::Stderr => std::io::stderr().write_all(buf),
543 }
544 .map_err(|e| anyhow!(e))
545 }
546}
547
548#[derive(Clone)]
549struct LogStream {
550 prefix: String,
551 output: Output,
552 needs_prefix_on_next_write: bool,
553}
554
555impl LogStream {
556 fn new(prefix: String, output: Output) -> LogStream {
557 LogStream {
558 prefix,
559 output,
560 needs_prefix_on_next_write: true,
561 }
562 }
563}
564
565impl wasmtime_wasi::StdoutStream for LogStream {
566 fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
567 Box::new(self.clone())
568 }
569
570 fn isatty(&self) -> bool {
571 use std::io::IsTerminal;
572
573 match &self.output {
574 Output::Stdout => std::io::stdout().is_terminal(),
575 Output::Stderr => std::io::stderr().is_terminal(),
576 }
577 }
578}
579
580impl wasmtime_wasi::HostOutputStream for LogStream {
581 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
582 let mut bytes = &bytes[..];
583
584 while !bytes.is_empty() {
585 if self.needs_prefix_on_next_write {
586 self.output
587 .write_all(self.prefix.as_bytes())
588 .map_err(StreamError::LastOperationFailed)?;
589 self.needs_prefix_on_next_write = false;
590 }
591 match bytes.iter().position(|b| *b == b'\n') {
592 Some(i) => {
593 let (a, b) = bytes.split_at(i + 1);
594 bytes = b;
595 self.output
596 .write_all(a)
597 .map_err(StreamError::LastOperationFailed)?;
598 self.needs_prefix_on_next_write = true;
599 }
600 None => {
601 self.output
602 .write_all(bytes)
603 .map_err(StreamError::LastOperationFailed)?;
604 break;
605 }
606 }
607 }
608
609 Ok(())
610 }
611
612 fn flush(&mut self) -> StreamResult<()> {
613 Ok(())
614 }
615
616 fn check_write(&mut self) -> StreamResult<usize> {
617 Ok(1024 * 1024)
618 }
619}
620
621#[async_trait::async_trait]
622impl wasmtime_wasi::Subscribe for LogStream {
623 async fn ready(&mut self) {}
624}
625
626fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
648 const BITS_TO_TEST: u32 = 42;
649 let mut config = Config::new();
650 config.wasm_memory64(true);
651 config.static_memory_maximum_size(1 << BITS_TO_TEST);
652 let engine = Engine::new(&config)?;
653 let mut store = Store::new(&engine, ());
654 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
657 if Memory::new(&mut store, ty).is_ok() {
658 Ok(Some(true))
659 } else {
660 Ok(None)
661 }
662}