1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{anyhow, bail, Result};
3use clap::Parser;
4use std::net::SocketAddr;
5use std::{
6 path::PathBuf,
7 sync::{
8 atomic::{AtomicBool, AtomicU64, Ordering},
9 Arc,
10 },
11};
12use wasmtime::component::Linker;
13use wasmtime::{Engine, Store, StoreLimits};
14use wasmtime_wasi::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
15use wasmtime_wasi_http::bindings::http::types::Scheme;
16use wasmtime_wasi_http::bindings::ProxyPre;
17use wasmtime_wasi_http::io::TokioIo;
18use wasmtime_wasi_http::{
19 body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView, DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS,
20 DEFAULT_OUTGOING_BODY_CHUNK_SIZE,
21};
22
23#[cfg(feature = "wasi-config")]
24use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
25#[cfg(feature = "wasi-keyvalue")]
26use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
27#[cfg(feature = "wasi-nn")]
28use wasmtime_wasi_nn::wit::WasiNnCtx;
29
30struct Host {
31 table: wasmtime::component::ResourceTable,
32 ctx: WasiCtx,
33 http: WasiHttpCtx,
34 http_outgoing_body_buffer_chunks: Option<usize>,
35 http_outgoing_body_chunk_size: Option<usize>,
36
37 limits: StoreLimits,
38
39 #[cfg(feature = "wasi-nn")]
40 nn: Option<WasiNnCtx>,
41
42 #[cfg(feature = "wasi-config")]
43 wasi_config: Option<WasiConfigVariables>,
44
45 #[cfg(feature = "wasi-keyvalue")]
46 wasi_keyvalue: Option<WasiKeyValueCtx>,
47}
48
49impl IoView for Host {
50 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
51 &mut self.table
52 }
53}
54impl WasiView for Host {
55 fn ctx(&mut self) -> &mut WasiCtx {
56 &mut self.ctx
57 }
58}
59
60impl WasiHttpView for Host {
61 fn ctx(&mut self) -> &mut WasiHttpCtx {
62 &mut self.http
63 }
64
65 fn outgoing_body_buffer_chunks(&mut self) -> usize {
66 self.http_outgoing_body_buffer_chunks
67 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
68 }
69
70 fn outgoing_body_chunk_size(&mut self) -> usize {
71 self.http_outgoing_body_chunk_size
72 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
73 }
74}
75
76const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
77 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
78 8080,
79);
80
81#[derive(Parser)]
83pub struct ServeCommand {
84 #[command(flatten)]
85 run: RunCommon,
86
87 #[arg(long = "addr", value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
89 addr: SocketAddr,
90
91 #[arg(long = "no-logging-prefix")]
94 no_logging_prefix: bool,
95
96 #[arg(value_name = "WASM", required = true)]
98 component: PathBuf,
99}
100
101impl ServeCommand {
102 pub fn execute(mut self) -> Result<()> {
104 self.run.common.init_logging()?;
105
106 if let Some(Profile::Guest { .. }) = &self.run.profile {
109 bail!("Cannot use the guest profiler with components");
110 }
111
112 if self.run.common.wasi.nn == Some(true) {
113 #[cfg(not(feature = "wasi-nn"))]
114 {
115 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
116 }
117 }
118
119 if self.run.common.wasi.threads == Some(true) {
120 bail!("wasi-threads does not support components yet")
121 }
122
123 if self.run.common.wasi.http.replace(true) == Some(false) {
126 bail!("wasi-http is required for the serve command, and must not be disabled");
127 }
128 if self.run.common.wasm.component_model.replace(true) == Some(false) {
129 bail!("components are required for the serve command, and must not be disabled");
130 }
131
132 let runtime = tokio::runtime::Builder::new_multi_thread()
133 .enable_time()
134 .enable_io()
135 .build()?;
136
137 runtime.block_on(async move {
138 tokio::select! {
139 _ = tokio::signal::ctrl_c() => {
140 Ok::<_, anyhow::Error>(())
141 }
142
143 res = self.serve() => {
144 res
145 }
146 }
147 })?;
148
149 Ok(())
150 }
151
152 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
153 let mut builder = WasiCtxBuilder::new();
154 self.run.configure_wasip2(&mut builder)?;
155
156 builder.env("REQUEST_ID", req_id.to_string());
157
158 let stdout_prefix: String;
159 let stderr_prefix: String;
160 if self.no_logging_prefix {
161 stdout_prefix = "".to_string();
162 stderr_prefix = "".to_string();
163 } else {
164 stdout_prefix = format!("stdout [{req_id}] :: ");
165 stderr_prefix = format!("stderr [{req_id}] :: ");
166 }
167 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
168 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
169
170 let mut host = Host {
171 table: wasmtime::component::ResourceTable::new(),
172 ctx: builder.build(),
173 http: WasiHttpCtx::new(),
174 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
175 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
176
177 limits: StoreLimits::default(),
178
179 #[cfg(feature = "wasi-nn")]
180 nn: None,
181 #[cfg(feature = "wasi-config")]
182 wasi_config: None,
183 #[cfg(feature = "wasi-keyvalue")]
184 wasi_keyvalue: None,
185 };
186
187 if self.run.common.wasi.nn == Some(true) {
188 #[cfg(feature = "wasi-nn")]
189 {
190 let graphs = self
191 .run
192 .common
193 .wasi
194 .nn_graph
195 .iter()
196 .map(|g| (g.format.clone(), g.dir.clone()))
197 .collect::<Vec<_>>();
198 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
199 host.nn.replace(WasiNnCtx::new(backends, registry));
200 }
201 }
202
203 if self.run.common.wasi.config == Some(true) {
204 #[cfg(feature = "wasi-config")]
205 {
206 let vars = WasiConfigVariables::from_iter(
207 self.run
208 .common
209 .wasi
210 .config_var
211 .iter()
212 .map(|v| (v.key.clone(), v.value.clone())),
213 );
214 host.wasi_config.replace(vars);
215 }
216 }
217
218 if self.run.common.wasi.keyvalue == Some(true) {
219 #[cfg(feature = "wasi-keyvalue")]
220 {
221 let ctx = WasiKeyValueCtxBuilder::new()
222 .in_memory_data(
223 self.run
224 .common
225 .wasi
226 .keyvalue_in_memory_data
227 .iter()
228 .map(|v| (v.key.clone(), v.value.clone())),
229 )
230 .build();
231 host.wasi_keyvalue.replace(ctx);
232 }
233 }
234
235 let mut store = Store::new(engine, host);
236
237 if self.run.common.wasm.timeout.is_some() {
238 store.set_epoch_deadline(u64::from(EPOCH_PRECISION) + 1);
239 }
240
241 store.data_mut().limits = self.run.store_limits();
242 store.limiter(|t| &mut t.limits);
243
244 if let Some(fuel) = self.run.common.wasm.fuel {
247 store.set_fuel(fuel)?;
248 }
249
250 Ok(store)
251 }
252
253 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
254 let mut cli = self.run.common.wasi.cli;
255
256 if let Some(common) = self.run.common.wasi.common {
258 if cli.is_some() {
259 bail!(
260 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
261 );
262 } else {
263 cli = Some(common);
266 }
267 }
268
269 if cli == Some(true) {
278 let link_options = self.run.compute_wasi_features();
279 wasmtime_wasi::add_to_linker_with_options_async(linker, &link_options)?;
280 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
281 } else {
282 wasmtime_wasi_http::add_to_linker_async(linker)?;
283 }
284
285 if self.run.common.wasi.nn == Some(true) {
286 #[cfg(not(feature = "wasi-nn"))]
287 {
288 bail!("support for wasi-nn was disabled at compile time");
289 }
290 #[cfg(feature = "wasi-nn")]
291 {
292 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
293 let ctx = h.nn.as_mut().unwrap();
294 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
295 })?;
296 }
297 }
298
299 if self.run.common.wasi.config == Some(true) {
300 #[cfg(not(feature = "wasi-config"))]
301 {
302 bail!("support for wasi-config was disabled at compile time");
303 }
304 #[cfg(feature = "wasi-config")]
305 {
306 wasmtime_wasi_config::add_to_linker(linker, |h| {
307 WasiConfig::from(h.wasi_config.as_ref().unwrap())
308 })?;
309 }
310 }
311
312 if self.run.common.wasi.keyvalue == Some(true) {
313 #[cfg(not(feature = "wasi-keyvalue"))]
314 {
315 bail!("support for wasi-keyvalue was disabled at compile time");
316 }
317 #[cfg(feature = "wasi-keyvalue")]
318 {
319 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
320 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
321 })?;
322 }
323 }
324
325 if self.run.common.wasi.threads == Some(true) {
326 bail!("support for wasi-threads is not available with components");
327 }
328
329 if self.run.common.wasi.http == Some(false) {
330 bail!("support for wasi-http must be enabled for `serve` subcommand");
331 }
332
333 Ok(())
334 }
335
336 async fn serve(mut self) -> Result<()> {
337 use hyper::server::conn::http1;
338
339 let mut config = self
340 .run
341 .common
342 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
343 config.wasm_component_model(true);
344 config.async_support(true);
345
346 if self.run.common.wasm.timeout.is_some() {
347 config.epoch_interruption(true);
348 }
349
350 match self.run.profile {
351 Some(Profile::Native(s)) => {
352 config.profiler(s);
353 }
354
355 Some(Profile::Guest { .. }) => unreachable!(),
357
358 None => {}
359 }
360
361 let engine = Engine::new(&config)?;
362 let mut linker = Linker::new(&engine);
363
364 self.add_to_linker(&mut linker)?;
365
366 let component = match self.run.load_module(&engine, &self.component)? {
367 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
368 RunTarget::Component(c) => c,
369 };
370
371 let instance = linker.instantiate_pre(&component)?;
372 let instance = ProxyPre::new(instance)?;
373
374 let socket = match &self.addr {
375 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
376 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
377 };
378 socket.set_reuseaddr(!cfg!(windows))?;
387 socket.bind(self.addr)?;
388 let listener = socket.listen(100)?;
389
390 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
391
392 let _epoch_thread = if let Some(timeout) = self.run.common.wasm.timeout {
393 Some(EpochThread::spawn(
394 timeout / EPOCH_PRECISION,
395 engine.clone(),
396 ))
397 } else {
398 None
399 };
400
401 log::info!("Listening on {}", self.addr);
402
403 let handler = ProxyHandler::new(self, engine, instance);
404
405 loop {
406 let (stream, _) = listener.accept().await?;
407 let stream = TokioIo::new(stream);
408 let h = handler.clone();
409 tokio::task::spawn(async {
410 if let Err(e) = http1::Builder::new()
411 .keep_alive(true)
412 .serve_connection(
413 stream,
414 hyper::service::service_fn(move |req| handle_request(h.clone(), req)),
415 )
416 .await
417 {
418 eprintln!("error: {e:?}");
419 }
420 });
421 }
422 }
423}
424
425const EPOCH_PRECISION: u32 = 10;
430
431struct EpochThread {
432 shutdown: Arc<AtomicBool>,
433 handle: Option<std::thread::JoinHandle<()>>,
434}
435
436impl EpochThread {
437 fn spawn(timeout: std::time::Duration, engine: Engine) -> Self {
438 let shutdown = Arc::new(AtomicBool::new(false));
439 let handle = {
440 let shutdown = Arc::clone(&shutdown);
441 let handle = std::thread::spawn(move || {
442 while !shutdown.load(Ordering::Relaxed) {
443 std::thread::sleep(timeout);
444 engine.increment_epoch();
445 }
446 });
447 Some(handle)
448 };
449
450 EpochThread { shutdown, handle }
451 }
452}
453
454impl Drop for EpochThread {
455 fn drop(&mut self) {
456 if let Some(handle) = self.handle.take() {
457 self.shutdown.store(true, Ordering::Relaxed);
458 handle.join().unwrap();
459 }
460 }
461}
462
463struct ProxyHandlerInner {
464 cmd: ServeCommand,
465 engine: Engine,
466 instance_pre: ProxyPre<Host>,
467 next_id: AtomicU64,
468}
469
470impl ProxyHandlerInner {
471 fn next_req_id(&self) -> u64 {
472 self.next_id.fetch_add(1, Ordering::Relaxed)
473 }
474}
475
476#[derive(Clone)]
477struct ProxyHandler(Arc<ProxyHandlerInner>);
478
479impl ProxyHandler {
480 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
481 Self(Arc::new(ProxyHandlerInner {
482 cmd,
483 engine,
484 instance_pre,
485 next_id: AtomicU64::from(0),
486 }))
487 }
488}
489
490type Request = hyper::Request<hyper::body::Incoming>;
491
492async fn handle_request(
493 ProxyHandler(inner): ProxyHandler,
494 req: Request,
495) -> Result<hyper::Response<HyperOutgoingBody>> {
496 let (sender, receiver) = tokio::sync::oneshot::channel();
497
498 let req_id = inner.next_req_id();
499
500 log::info!(
501 "Request {req_id} handling {} to {}",
502 req.method(),
503 req.uri()
504 );
505
506 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
507
508 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
509 let out = store.data_mut().new_response_outparam(sender)?;
510 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
511
512 let task = tokio::task::spawn(async move {
513 if let Err(e) = proxy
514 .wasi_http_incoming_handler()
515 .call_handle(store, req, out)
516 .await
517 {
518 log::error!("[{req_id}] :: {:?}", e);
519 return Err(e);
520 }
521
522 Ok(())
523 });
524
525 match receiver.await {
526 Ok(Ok(resp)) => Ok(resp),
527 Ok(Err(e)) => Err(e.into()),
528 Err(_) => {
529 let e = match task.await {
537 Ok(r) => r.expect_err("if the receiver has an error, the task must have failed"),
538 Err(e) => e.into(),
539 };
540 return Err(e.context("guest never invoked `response-outparam::set` method"));
541 }
542 }
543}
544
545#[derive(Clone)]
546enum Output {
547 Stdout,
548 Stderr,
549}
550
551impl Output {
552 fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
553 use std::io::Write;
554
555 match self {
556 Output::Stdout => std::io::stdout().write_all(buf),
557 Output::Stderr => std::io::stderr().write_all(buf),
558 }
559 .map_err(|e| anyhow!(e))
560 }
561}
562
563#[derive(Clone)]
564struct LogStream {
565 prefix: String,
566 output: Output,
567 needs_prefix_on_next_write: bool,
568}
569
570impl LogStream {
571 fn new(prefix: String, output: Output) -> LogStream {
572 LogStream {
573 prefix,
574 output,
575 needs_prefix_on_next_write: true,
576 }
577 }
578}
579
580impl wasmtime_wasi::StdoutStream for LogStream {
581 fn stream(&self) -> Box<dyn wasmtime_wasi::OutputStream> {
582 Box::new(self.clone())
583 }
584
585 fn isatty(&self) -> bool {
586 use std::io::IsTerminal;
587
588 match &self.output {
589 Output::Stdout => std::io::stdout().is_terminal(),
590 Output::Stderr => std::io::stderr().is_terminal(),
591 }
592 }
593}
594
595impl wasmtime_wasi::OutputStream for LogStream {
596 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
597 let mut bytes = &bytes[..];
598
599 while !bytes.is_empty() {
600 if self.needs_prefix_on_next_write {
601 self.output
602 .write_all(self.prefix.as_bytes())
603 .map_err(StreamError::LastOperationFailed)?;
604 self.needs_prefix_on_next_write = false;
605 }
606 match bytes.iter().position(|b| *b == b'\n') {
607 Some(i) => {
608 let (a, b) = bytes.split_at(i + 1);
609 bytes = b;
610 self.output
611 .write_all(a)
612 .map_err(StreamError::LastOperationFailed)?;
613 self.needs_prefix_on_next_write = true;
614 }
615 None => {
616 self.output
617 .write_all(bytes)
618 .map_err(StreamError::LastOperationFailed)?;
619 break;
620 }
621 }
622 }
623
624 Ok(())
625 }
626
627 fn flush(&mut self) -> StreamResult<()> {
628 Ok(())
629 }
630
631 fn check_write(&mut self) -> StreamResult<usize> {
632 Ok(1024 * 1024)
633 }
634}
635
636#[async_trait::async_trait]
637impl wasmtime_wasi::Pollable for LogStream {
638 async fn ready(&mut self) {}
639}
640
641fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
663 use wasmtime::{Config, Memory, MemoryType};
664 const BITS_TO_TEST: u32 = 42;
665 let mut config = Config::new();
666 config.wasm_memory64(true);
667 config.memory_reservation(1 << BITS_TO_TEST);
668 let engine = Engine::new(&config)?;
669 let mut store = Store::new(&engine, ());
670 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
673 if Memory::new(&mut store, ty).is_ok() {
674 Ok(Some(true))
675 } else {
676 Ok(None)
677 }
678}