1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use std::{
11 path::PathBuf,
12 sync::{
13 Arc, Mutex,
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 },
16 time::Duration,
17};
18use tokio::io::{self, AsyncWrite};
19use tokio::sync::Notify;
20use wasmtime::component::{Component, Linker, ResourceTable};
21use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
22use wasmtime_wasi::p2::{StreamError, StreamResult};
23use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
24use wasmtime_wasi_http::bindings::ProxyPre;
25use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
26use wasmtime_wasi_http::io::TokioIo;
27use wasmtime_wasi_http::{
28 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
29 WasiHttpView, body::HyperOutgoingBody,
30};
31
32#[cfg(feature = "wasi-config")]
33use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
34#[cfg(feature = "wasi-keyvalue")]
35use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
36#[cfg(feature = "wasi-nn")]
37use wasmtime_wasi_nn::wit::WasiNnCtx;
38
39struct Host {
40 table: wasmtime::component::ResourceTable,
41 ctx: WasiCtx,
42 http: WasiHttpCtx,
43 http_outgoing_body_buffer_chunks: Option<usize>,
44 http_outgoing_body_chunk_size: Option<usize>,
45
46 limits: StoreLimits,
47
48 #[cfg(feature = "wasi-nn")]
49 nn: Option<WasiNnCtx>,
50
51 #[cfg(feature = "wasi-config")]
52 wasi_config: Option<WasiConfigVariables>,
53
54 #[cfg(feature = "wasi-keyvalue")]
55 wasi_keyvalue: Option<WasiKeyValueCtx>,
56
57 #[cfg(feature = "profiling")]
58 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
59}
60
61impl WasiView for Host {
62 fn ctx(&mut self) -> WasiCtxView<'_> {
63 WasiCtxView {
64 ctx: &mut self.ctx,
65 table: &mut self.table,
66 }
67 }
68}
69
70impl WasiHttpView for Host {
71 fn ctx(&mut self) -> &mut WasiHttpCtx {
72 &mut self.http
73 }
74 fn table(&mut self) -> &mut ResourceTable {
75 &mut self.table
76 }
77
78 fn outgoing_body_buffer_chunks(&mut self) -> usize {
79 self.http_outgoing_body_buffer_chunks
80 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
81 }
82
83 fn outgoing_body_chunk_size(&mut self) -> usize {
84 self.http_outgoing_body_chunk_size
85 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
86 }
87}
88
89const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
90 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
91 8080,
92);
93
94#[derive(Parser)]
96pub struct ServeCommand {
97 #[command(flatten)]
98 run: RunCommon,
99
100 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
102 addr: SocketAddr,
103
104 #[arg(long, value_name = "SOCKADDR")]
109 shutdown_addr: Option<SocketAddr>,
110
111 #[arg(long)]
114 no_logging_prefix: bool,
115
116 #[arg(value_name = "WASM", required = true)]
118 component: PathBuf,
119}
120
121impl ServeCommand {
122 pub fn execute(mut self) -> Result<()> {
124 self.run.common.init_logging()?;
125
126 if self.run.common.wasi.nn == Some(true) {
130 #[cfg(not(feature = "wasi-nn"))]
131 {
132 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
133 }
134 }
135
136 if self.run.common.wasi.threads == Some(true) {
137 bail!("wasi-threads does not support components yet")
138 }
139
140 if self.run.common.wasi.http.replace(true) == Some(false) {
143 bail!("wasi-http is required for the serve command, and must not be disabled");
144 }
145 if self.run.common.wasm.component_model.replace(true) == Some(false) {
146 bail!("components are required for the serve command, and must not be disabled");
147 }
148
149 let runtime = tokio::runtime::Builder::new_multi_thread()
150 .enable_time()
151 .enable_io()
152 .build()?;
153
154 runtime.block_on(self.serve())?;
155
156 Ok(())
157 }
158
159 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
160 let mut builder = WasiCtxBuilder::new();
161 self.run.configure_wasip2(&mut builder)?;
162
163 builder.env("REQUEST_ID", req_id.to_string());
164
165 let stdout_prefix: String;
166 let stderr_prefix: String;
167 if self.no_logging_prefix {
168 stdout_prefix = "".to_string();
169 stderr_prefix = "".to_string();
170 } else {
171 stdout_prefix = format!("stdout [{req_id}] :: ");
172 stderr_prefix = format!("stderr [{req_id}] :: ");
173 }
174 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
175 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
176
177 let mut table = wasmtime::component::ResourceTable::new();
178 if let Some(max) = self.run.common.wasi.max_resources {
179 table.set_max_capacity(max);
180 }
181 let mut host = Host {
182 table,
183 ctx: builder.build(),
184 http: self.run.wasi_http_ctx()?,
185 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
186 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
187
188 limits: StoreLimits::default(),
189
190 #[cfg(feature = "wasi-nn")]
191 nn: None,
192 #[cfg(feature = "wasi-config")]
193 wasi_config: None,
194 #[cfg(feature = "wasi-keyvalue")]
195 wasi_keyvalue: None,
196 #[cfg(feature = "profiling")]
197 guest_profiler: None,
198 };
199
200 if self.run.common.wasi.nn == Some(true) {
201 #[cfg(feature = "wasi-nn")]
202 {
203 let graphs = self
204 .run
205 .common
206 .wasi
207 .nn_graph
208 .iter()
209 .map(|g| (g.format.clone(), g.dir.clone()))
210 .collect::<Vec<_>>();
211 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
212 host.nn.replace(WasiNnCtx::new(backends, registry));
213 }
214 }
215
216 if self.run.common.wasi.config == Some(true) {
217 #[cfg(feature = "wasi-config")]
218 {
219 let vars = WasiConfigVariables::from_iter(
220 self.run
221 .common
222 .wasi
223 .config_var
224 .iter()
225 .map(|v| (v.key.clone(), v.value.clone())),
226 );
227 host.wasi_config.replace(vars);
228 }
229 }
230
231 if self.run.common.wasi.keyvalue == Some(true) {
232 #[cfg(feature = "wasi-keyvalue")]
233 {
234 let ctx = WasiKeyValueCtxBuilder::new()
235 .in_memory_data(
236 self.run
237 .common
238 .wasi
239 .keyvalue_in_memory_data
240 .iter()
241 .map(|v| (v.key.clone(), v.value.clone())),
242 )
243 .build();
244 host.wasi_keyvalue.replace(ctx);
245 }
246 }
247
248 let mut store = Store::new(engine, host);
249
250 if let Some(fuel) = self.run.common.wasi.hostcall_fuel {
251 store.set_hostcall_fuel(fuel);
252 }
253
254 store.data_mut().limits = self.run.store_limits();
255 store.limiter(|t| &mut t.limits);
256
257 if let Some(fuel) = self.run.common.wasm.fuel {
260 store.set_fuel(fuel)?;
261 }
262
263 Ok(store)
264 }
265
266 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
267 let mut cli = self.run.common.wasi.cli;
268
269 if let Some(common) = self.run.common.wasi.common {
271 if cli.is_some() {
272 bail!(
273 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
274 );
275 } else {
276 cli = Some(common);
279 }
280 }
281
282 if cli == Some(true) {
291 let link_options = self.run.compute_wasi_features();
292 wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
293 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
294 } else {
295 wasmtime_wasi_http::add_to_linker_async(linker)?;
296 }
297
298 if self.run.common.wasi.nn == Some(true) {
299 #[cfg(not(feature = "wasi-nn"))]
300 {
301 bail!("support for wasi-nn was disabled at compile time");
302 }
303 #[cfg(feature = "wasi-nn")]
304 {
305 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
306 let ctx = h.nn.as_mut().unwrap();
307 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
308 })?;
309 }
310 }
311
312 if self.run.common.wasi.config == Some(true) {
313 #[cfg(not(feature = "wasi-config"))]
314 {
315 bail!("support for wasi-config was disabled at compile time");
316 }
317 #[cfg(feature = "wasi-config")]
318 {
319 wasmtime_wasi_config::add_to_linker(linker, |h| {
320 WasiConfig::from(h.wasi_config.as_ref().unwrap())
321 })?;
322 }
323 }
324
325 if self.run.common.wasi.keyvalue == Some(true) {
326 #[cfg(not(feature = "wasi-keyvalue"))]
327 {
328 bail!("support for wasi-keyvalue was disabled at compile time");
329 }
330 #[cfg(feature = "wasi-keyvalue")]
331 {
332 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
333 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
334 })?;
335 }
336 }
337
338 if self.run.common.wasi.threads == Some(true) {
339 bail!("support for wasi-threads is not available with components");
340 }
341
342 if self.run.common.wasi.http == Some(false) {
343 bail!("support for wasi-http must be enabled for `serve` subcommand");
344 }
345
346 Ok(())
347 }
348
349 async fn serve(mut self) -> Result<()> {
350 use hyper::server::conn::http1;
351
352 let mut config = self
353 .run
354 .common
355 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
356 config.wasm_component_model(true);
357 config.async_support(true);
358
359 if self.run.common.wasm.timeout.is_some() {
360 config.epoch_interruption(true);
361 }
362
363 match self.run.profile {
364 Some(Profile::Native(s)) => {
365 config.profiler(s);
366 }
367 Some(Profile::Guest { .. }) => {
368 config.epoch_interruption(true);
369 }
370 None => {}
371 }
372
373 let engine = Engine::new(&config)?;
374 let mut linker = Linker::new(&engine);
375
376 self.add_to_linker(&mut linker)?;
377
378 let component = match self.run.load_module(&engine, &self.component)? {
379 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
380 RunTarget::Component(c) => c,
381 };
382
383 let instance = linker.instantiate_pre(&component)?;
384 let instance = ProxyPre::new(instance)?;
385
386 let shutdown = Arc::new(GracefulShutdown::default());
390 tokio::task::spawn({
391 let shutdown = shutdown.clone();
392 async move {
393 tokio::signal::ctrl_c().await.unwrap();
394 shutdown.requested.notify_one();
395 }
396 });
397 if let Some(addr) = self.shutdown_addr {
398 let listener = tokio::net::TcpListener::bind(addr).await?;
399 eprintln!(
400 "Listening for shutdown on tcp://{}/",
401 listener.local_addr()?
402 );
403 let shutdown = shutdown.clone();
404 tokio::task::spawn(async move {
405 let _ = listener.accept().await;
406 shutdown.requested.notify_one();
407 });
408 }
409
410 let socket = match &self.addr {
411 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
412 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
413 };
414 socket.set_reuseaddr(!cfg!(windows))?;
423 socket.bind(self.addr)?;
424 let listener = socket.listen(100)?;
425
426 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
427
428 log::info!("Listening on {}", self.addr);
429
430 let handler = ProxyHandler::new(self, engine, instance);
431
432 loop {
433 let (stream, _) = tokio::select! {
437 _ = shutdown.requested.notified() => break,
438 v = listener.accept() => v?,
439 };
440 let comp = component.clone();
441 let stream = TokioIo::new(stream);
442 let h = handler.clone();
443 let shutdown_guard = shutdown.clone().increment();
444 tokio::task::spawn(async move {
445 if let Err(e) = http1::Builder::new()
446 .keep_alive(true)
447 .serve_connection(
448 stream,
449 hyper::service::service_fn(move |req| {
450 let comp = comp.clone();
451 let h = h.clone();
452 async move {
453 use http_body_util::{BodyExt, Full};
454 fn to_errorcode(_: Infallible) -> ErrorCode {
455 unreachable!()
456 }
457 match handle_request(h, req, comp).await {
458 Ok(r) => Ok::<_, Infallible>(r),
459 Err(e) => {
460 eprintln!("error: {e:?}");
461 let error_html = "\
462<!doctype html>
463<html>
464<head>
465 <title>500 Internal Server Error</title>
466</head>
467<body>
468 <center>
469 <h1>500 Internal Server Error</h1>
470 <hr>
471 wasmtime
472 </center>
473</body>
474</html>";
475 Ok(Response::builder()
476 .status(StatusCode::INTERNAL_SERVER_ERROR)
477 .header("Content-Type", "text/html; charset=UTF-8")
478 .body(
479 Full::new(bytes::Bytes::from(error_html))
480 .map_err(to_errorcode)
481 .boxed(),
482 )
483 .unwrap())
484 }
485 }
486 }
487 }),
488 )
489 .await
490 {
491 eprintln!("error: {e:?}");
492 }
493 drop(shutdown_guard);
494 });
495 }
496
497 if shutdown.close() {
503 return Ok(());
504 }
505 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
506 tokio::select! {
507 _ = tokio::signal::ctrl_c() => {}
508 _ = shutdown.complete.notified() => {}
509 }
510
511 Ok(())
512 }
513}
514
515#[derive(Default)]
517struct GracefulShutdown {
518 requested: Notify,
520 complete: Notify,
523 state: Mutex<GracefulShutdownState>,
525}
526
527#[derive(Default)]
528struct GracefulShutdownState {
529 active_tasks: u32,
530 notify_when_done: bool,
531}
532
533impl GracefulShutdown {
534 fn increment(self: Arc<Self>) -> impl Drop {
536 struct Guard(Arc<GracefulShutdown>);
537
538 let mut state = self.state.lock().unwrap();
539 assert!(!state.notify_when_done);
540 state.active_tasks += 1;
541 drop(state);
542
543 return Guard(self);
544
545 impl Drop for Guard {
546 fn drop(&mut self) {
547 let mut state = self.0.state.lock().unwrap();
548 state.active_tasks -= 1;
549 if state.notify_when_done && state.active_tasks == 0 {
550 self.0.complete.notify_one();
551 }
552 }
553 }
554 }
555
556 fn close(&self) -> bool {
559 let mut state = self.state.lock().unwrap();
560 state.notify_when_done = true;
561 state.active_tasks == 0
562 }
563}
564
565const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
569
570struct EpochThread {
571 shutdown: Arc<AtomicBool>,
572 handle: Option<std::thread::JoinHandle<()>>,
573}
574
575impl EpochThread {
576 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
577 let shutdown = Arc::new(AtomicBool::new(false));
578 let handle = {
579 let shutdown = Arc::clone(&shutdown);
580 let handle = std::thread::spawn(move || {
581 while !shutdown.load(Ordering::Relaxed) {
582 std::thread::sleep(interval);
583 engine.increment_epoch();
584 }
585 });
586 Some(handle)
587 };
588
589 EpochThread { shutdown, handle }
590 }
591}
592
593impl Drop for EpochThread {
594 fn drop(&mut self) {
595 if let Some(handle) = self.handle.take() {
596 self.shutdown.store(true, Ordering::Relaxed);
597 handle.join().unwrap();
598 }
599 }
600}
601
602type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
603
604fn setup_epoch_handler(
605 cmd: &ServeCommand,
606 store: &mut Store<Host>,
607 component: Component,
608) -> Result<(WriteProfile, Option<EpochThread>)> {
609 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
611 #[cfg(feature = "profiling")]
612 return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
613 #[cfg(not(feature = "profiling"))]
614 {
615 let _ = (path, interval);
616 bail!("support for profiling disabled at compile time!");
617 }
618 }
619
620 let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
622 let start = Instant::now();
623 store.epoch_deadline_callback(move |_store| {
624 if start.elapsed() > timeout {
625 bail!("Timeout expired");
626 }
627 Ok(UpdateDeadline::Continue(1))
628 });
629 store.set_epoch_deadline(1);
630 let engine = store.engine().clone();
631 Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
632 } else {
633 None
634 };
635
636 Ok((Box::new(|_store| {}), epoch_thread))
637}
638
639#[cfg(feature = "profiling")]
640fn setup_guest_profiler(
641 cmd: &ServeCommand,
642 store: &mut Store<Host>,
643 path: String,
644 interval: Duration,
645 component: Component,
646) -> Result<(WriteProfile, Option<EpochThread>)> {
647 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
648
649 let module_name = "<main>";
650
651 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
652 module_name,
653 interval,
654 component,
655 std::iter::empty(),
656 )));
657
658 fn sample(
659 mut store: StoreContextMut<Host>,
660 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
661 ) {
662 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
663 f(
664 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
665 store.as_context(),
666 );
667 store.data_mut().guest_profiler = Some(profiler);
668 }
669
670 store.call_hook(|store, kind| {
672 sample(store, |profiler, store| profiler.call_hook(store, kind));
673 Ok(())
674 });
675
676 let start = Instant::now();
677 let timeout = cmd.run.common.wasm.timeout;
678 store.epoch_deadline_callback(move |store| {
679 sample(store, |profiler, store| {
680 profiler.sample(store, std::time::Duration::ZERO)
681 });
682
683 if let Some(timeout) = timeout {
687 if start.elapsed() > timeout {
688 bail!("Timeout expired");
689 }
690 }
691
692 Ok(UpdateDeadline::Continue(1))
693 });
694
695 store.set_epoch_deadline(1);
696 let engine = store.engine().clone();
697 let epoch_thread = Some(EpochThread::spawn(interval, engine));
698
699 let write_profile = Box::new(move |store: &mut Store<Host>| {
700 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
701 .expect("profiling doesn't support threads yet");
702 if let Err(e) = std::fs::File::create(&path)
703 .map_err(anyhow::Error::new)
704 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
705 {
706 eprintln!("failed writing profile at {path}: {e:#}");
707 } else {
708 eprintln!();
709 eprintln!("Profile written to: {path}");
710 eprintln!("View this profile at https://profiler.firefox.com/.");
711 }
712 });
713
714 Ok((write_profile, epoch_thread))
715}
716
717struct ProxyHandlerInner {
718 cmd: ServeCommand,
719 engine: Engine,
720 instance_pre: ProxyPre<Host>,
721 next_id: AtomicU64,
722}
723
724impl ProxyHandlerInner {
725 fn next_req_id(&self) -> u64 {
726 self.next_id.fetch_add(1, Ordering::Relaxed)
727 }
728}
729
730#[derive(Clone)]
731struct ProxyHandler(Arc<ProxyHandlerInner>);
732
733impl ProxyHandler {
734 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
735 Self(Arc::new(ProxyHandlerInner {
736 cmd,
737 engine,
738 instance_pre,
739 next_id: AtomicU64::from(0),
740 }))
741 }
742}
743
744type Request = hyper::Request<hyper::body::Incoming>;
745
746async fn handle_request(
747 ProxyHandler(inner): ProxyHandler,
748 req: Request,
749 component: Component,
750) -> Result<hyper::Response<HyperOutgoingBody>> {
751 let (sender, receiver) = tokio::sync::oneshot::channel();
752
753 let req_id = inner.next_req_id();
754
755 log::info!(
756 "Request {req_id} handling {} to {}",
757 req.method(),
758 req.uri()
759 );
760
761 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
762
763 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
764 let out = store.data_mut().new_response_outparam(sender)?;
765 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
766
767 let comp = component.clone();
768 let task = tokio::task::spawn(async move {
769 let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
770
771 if let Err(e) = proxy
772 .wasi_http_incoming_handler()
773 .call_handle(&mut store, req, out)
774 .await
775 {
776 log::error!("[{req_id}] :: {e:?}");
777 return Err(e);
778 }
779
780 write_profile(&mut store);
781 drop(epoch_thread);
782
783 Ok(())
784 });
785
786 let result = match receiver.await {
787 Ok(Ok(resp)) => Ok(resp),
788 Ok(Err(e)) => Err(e.into()),
789 Err(_) => {
790 let e = match task.await {
798 Ok(Ok(())) => {
799 bail!("guest never invoked `response-outparam::set` method")
800 }
801 Ok(Err(e)) => e,
802 Err(e) => e.into(),
803 };
804 Err(e.context("guest never invoked `response-outparam::set` method"))
805 }
806 };
807
808 result
809}
810
811#[derive(Clone)]
812enum Output {
813 Stdout,
814 Stderr,
815}
816
817impl Output {
818 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
819 use std::io::Write;
820
821 match self {
822 Output::Stdout => std::io::stdout().write_all(buf),
823 Output::Stderr => std::io::stderr().write_all(buf),
824 }
825 }
826}
827
828#[derive(Clone)]
829struct LogStream {
830 output: Output,
831 state: Arc<LogStreamState>,
832}
833
834struct LogStreamState {
835 prefix: String,
836 needs_prefix_on_next_write: AtomicBool,
837}
838
839impl LogStream {
840 fn new(prefix: String, output: Output) -> LogStream {
841 LogStream {
842 output,
843 state: Arc::new(LogStreamState {
844 prefix,
845 needs_prefix_on_next_write: AtomicBool::new(true),
846 }),
847 }
848 }
849
850 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
851 while !bytes.is_empty() {
852 if self
853 .state
854 .needs_prefix_on_next_write
855 .load(Ordering::Relaxed)
856 {
857 self.output.write_all(self.state.prefix.as_bytes())?;
858 self.state
859 .needs_prefix_on_next_write
860 .store(false, Ordering::Relaxed);
861 }
862 match bytes.iter().position(|b| *b == b'\n') {
863 Some(i) => {
864 let (a, b) = bytes.split_at(i + 1);
865 bytes = b;
866 self.output.write_all(a)?;
867 self.state
868 .needs_prefix_on_next_write
869 .store(true, Ordering::Relaxed);
870 }
871 None => {
872 self.output.write_all(bytes)?;
873 break;
874 }
875 }
876 }
877
878 Ok(())
879 }
880}
881
882impl wasmtime_wasi::cli::StdoutStream for LogStream {
883 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
884 Box::new(self.clone())
885 }
886 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
887 Box::new(self.clone())
888 }
889}
890
891impl wasmtime_wasi::cli::IsTerminal for LogStream {
892 fn is_terminal(&self) -> bool {
893 match &self.output {
894 Output::Stdout => std::io::stdout().is_terminal(),
895 Output::Stderr => std::io::stderr().is_terminal(),
896 }
897 }
898}
899
900impl wasmtime_wasi::p2::OutputStream for LogStream {
901 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
902 self.write_all(&bytes)
903 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
904 Ok(())
905 }
906
907 fn flush(&mut self) -> StreamResult<()> {
908 Ok(())
909 }
910
911 fn check_write(&mut self) -> StreamResult<usize> {
912 Ok(1024 * 1024)
913 }
914}
915
916#[async_trait::async_trait]
917impl wasmtime_wasi::p2::Pollable for LogStream {
918 async fn ready(&mut self) {}
919}
920
921impl AsyncWrite for LogStream {
922 fn poll_write(
923 mut self: Pin<&mut Self>,
924 _cx: &mut Context<'_>,
925 buf: &[u8],
926 ) -> Poll<io::Result<usize>> {
927 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
928 }
929 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
930 Poll::Ready(Ok(()))
931 }
932 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
933 Poll::Ready(Ok(()))
934 }
935}
936
937fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
959 use wasmtime::{Config, Memory, MemoryType};
960 const BITS_TO_TEST: u32 = 42;
961 let mut config = Config::new();
962 config.wasm_memory64(true);
963 config.memory_reservation(1 << BITS_TO_TEST);
964 let engine = Engine::new(&config)?;
965 let mut store = Store::new(&engine, ());
966 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
969 if Memory::new(&mut store, ty).is_ok() {
970 Ok(Some(true))
971 } else {
972 Ok(None)
973 }
974}