1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use std::{
11 path::PathBuf,
12 sync::{
13 Arc, Mutex,
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 },
16 time::Duration,
17};
18use tokio::io::{self, AsyncWrite};
19use tokio::sync::Notify;
20use wasmtime::component::{Component, Linker, ResourceTable};
21use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
22use wasmtime_wasi::p2::{StreamError, StreamResult};
23use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
24use wasmtime_wasi_http::bindings::ProxyPre;
25use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
26use wasmtime_wasi_http::io::TokioIo;
27use wasmtime_wasi_http::{
28 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
29 WasiHttpView, body::HyperOutgoingBody,
30};
31
32#[cfg(feature = "wasi-config")]
33use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
34#[cfg(feature = "wasi-keyvalue")]
35use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
36#[cfg(feature = "wasi-nn")]
37use wasmtime_wasi_nn::wit::WasiNnCtx;
38
39struct Host {
40 table: wasmtime::component::ResourceTable,
41 ctx: WasiCtx,
42 http: WasiHttpCtx,
43 http_outgoing_body_buffer_chunks: Option<usize>,
44 http_outgoing_body_chunk_size: Option<usize>,
45
46 limits: StoreLimits,
47
48 #[cfg(feature = "wasi-nn")]
49 nn: Option<WasiNnCtx>,
50
51 #[cfg(feature = "wasi-config")]
52 wasi_config: Option<WasiConfigVariables>,
53
54 #[cfg(feature = "wasi-keyvalue")]
55 wasi_keyvalue: Option<WasiKeyValueCtx>,
56
57 #[cfg(feature = "profiling")]
58 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
59}
60
61impl WasiView for Host {
62 fn ctx(&mut self) -> WasiCtxView<'_> {
63 WasiCtxView {
64 ctx: &mut self.ctx,
65 table: &mut self.table,
66 }
67 }
68}
69
70impl WasiHttpView for Host {
71 fn ctx(&mut self) -> &mut WasiHttpCtx {
72 &mut self.http
73 }
74 fn table(&mut self) -> &mut ResourceTable {
75 &mut self.table
76 }
77
78 fn outgoing_body_buffer_chunks(&mut self) -> usize {
79 self.http_outgoing_body_buffer_chunks
80 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
81 }
82
83 fn outgoing_body_chunk_size(&mut self) -> usize {
84 self.http_outgoing_body_chunk_size
85 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
86 }
87}
88
89const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
90 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
91 8080,
92);
93
94#[derive(Parser)]
96pub struct ServeCommand {
97 #[command(flatten)]
98 run: RunCommon,
99
100 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
102 addr: SocketAddr,
103
104 #[arg(long, value_name = "SOCKADDR")]
109 shutdown_addr: Option<SocketAddr>,
110
111 #[arg(long)]
114 no_logging_prefix: bool,
115
116 #[arg(value_name = "WASM", required = true)]
118 component: PathBuf,
119}
120
121impl ServeCommand {
122 pub fn execute(mut self) -> Result<()> {
124 self.run.common.init_logging()?;
125
126 if self.run.common.wasi.nn == Some(true) {
130 #[cfg(not(feature = "wasi-nn"))]
131 {
132 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
133 }
134 }
135
136 if self.run.common.wasi.threads == Some(true) {
137 bail!("wasi-threads does not support components yet")
138 }
139
140 if self.run.common.wasi.http.replace(true) == Some(false) {
143 bail!("wasi-http is required for the serve command, and must not be disabled");
144 }
145 if self.run.common.wasm.component_model.replace(true) == Some(false) {
146 bail!("components are required for the serve command, and must not be disabled");
147 }
148
149 let runtime = tokio::runtime::Builder::new_multi_thread()
150 .enable_time()
151 .enable_io()
152 .build()?;
153
154 runtime.block_on(self.serve())?;
155
156 Ok(())
157 }
158
159 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
160 let mut builder = WasiCtxBuilder::new();
161 self.run.configure_wasip2(&mut builder)?;
162
163 builder.env("REQUEST_ID", req_id.to_string());
164
165 let stdout_prefix: String;
166 let stderr_prefix: String;
167 if self.no_logging_prefix {
168 stdout_prefix = "".to_string();
169 stderr_prefix = "".to_string();
170 } else {
171 stdout_prefix = format!("stdout [{req_id}] :: ");
172 stderr_prefix = format!("stderr [{req_id}] :: ");
173 }
174 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
175 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
176
177 let mut host = Host {
178 table: wasmtime::component::ResourceTable::new(),
179 ctx: builder.build(),
180 http: WasiHttpCtx::new(),
181 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
182 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
183
184 limits: StoreLimits::default(),
185
186 #[cfg(feature = "wasi-nn")]
187 nn: None,
188 #[cfg(feature = "wasi-config")]
189 wasi_config: None,
190 #[cfg(feature = "wasi-keyvalue")]
191 wasi_keyvalue: None,
192 #[cfg(feature = "profiling")]
193 guest_profiler: None,
194 };
195
196 if self.run.common.wasi.nn == Some(true) {
197 #[cfg(feature = "wasi-nn")]
198 {
199 let graphs = self
200 .run
201 .common
202 .wasi
203 .nn_graph
204 .iter()
205 .map(|g| (g.format.clone(), g.dir.clone()))
206 .collect::<Vec<_>>();
207 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
208 host.nn.replace(WasiNnCtx::new(backends, registry));
209 }
210 }
211
212 if self.run.common.wasi.config == Some(true) {
213 #[cfg(feature = "wasi-config")]
214 {
215 let vars = WasiConfigVariables::from_iter(
216 self.run
217 .common
218 .wasi
219 .config_var
220 .iter()
221 .map(|v| (v.key.clone(), v.value.clone())),
222 );
223 host.wasi_config.replace(vars);
224 }
225 }
226
227 if self.run.common.wasi.keyvalue == Some(true) {
228 #[cfg(feature = "wasi-keyvalue")]
229 {
230 let ctx = WasiKeyValueCtxBuilder::new()
231 .in_memory_data(
232 self.run
233 .common
234 .wasi
235 .keyvalue_in_memory_data
236 .iter()
237 .map(|v| (v.key.clone(), v.value.clone())),
238 )
239 .build();
240 host.wasi_keyvalue.replace(ctx);
241 }
242 }
243
244 let mut store = Store::new(engine, host);
245
246 store.data_mut().limits = self.run.store_limits();
247 store.limiter(|t| &mut t.limits);
248
249 if let Some(fuel) = self.run.common.wasm.fuel {
252 store.set_fuel(fuel)?;
253 }
254
255 Ok(store)
256 }
257
258 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
259 let mut cli = self.run.common.wasi.cli;
260
261 if let Some(common) = self.run.common.wasi.common {
263 if cli.is_some() {
264 bail!(
265 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
266 );
267 } else {
268 cli = Some(common);
271 }
272 }
273
274 if cli == Some(true) {
283 let link_options = self.run.compute_wasi_features();
284 wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
285 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
286 } else {
287 wasmtime_wasi_http::add_to_linker_async(linker)?;
288 }
289
290 if self.run.common.wasi.nn == Some(true) {
291 #[cfg(not(feature = "wasi-nn"))]
292 {
293 bail!("support for wasi-nn was disabled at compile time");
294 }
295 #[cfg(feature = "wasi-nn")]
296 {
297 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
298 let ctx = h.nn.as_mut().unwrap();
299 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
300 })?;
301 }
302 }
303
304 if self.run.common.wasi.config == Some(true) {
305 #[cfg(not(feature = "wasi-config"))]
306 {
307 bail!("support for wasi-config was disabled at compile time");
308 }
309 #[cfg(feature = "wasi-config")]
310 {
311 wasmtime_wasi_config::add_to_linker(linker, |h| {
312 WasiConfig::from(h.wasi_config.as_ref().unwrap())
313 })?;
314 }
315 }
316
317 if self.run.common.wasi.keyvalue == Some(true) {
318 #[cfg(not(feature = "wasi-keyvalue"))]
319 {
320 bail!("support for wasi-keyvalue was disabled at compile time");
321 }
322 #[cfg(feature = "wasi-keyvalue")]
323 {
324 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
325 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
326 })?;
327 }
328 }
329
330 if self.run.common.wasi.threads == Some(true) {
331 bail!("support for wasi-threads is not available with components");
332 }
333
334 if self.run.common.wasi.http == Some(false) {
335 bail!("support for wasi-http must be enabled for `serve` subcommand");
336 }
337
338 Ok(())
339 }
340
341 async fn serve(mut self) -> Result<()> {
342 use hyper::server::conn::http1;
343
344 let mut config = self
345 .run
346 .common
347 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
348 config.wasm_component_model(true);
349 config.async_support(true);
350
351 if self.run.common.wasm.timeout.is_some() {
352 config.epoch_interruption(true);
353 }
354
355 match self.run.profile {
356 Some(Profile::Native(s)) => {
357 config.profiler(s);
358 }
359 Some(Profile::Guest { .. }) => {
360 config.epoch_interruption(true);
361 }
362 None => {}
363 }
364
365 let engine = Engine::new(&config)?;
366 let mut linker = Linker::new(&engine);
367
368 self.add_to_linker(&mut linker)?;
369
370 let component = match self.run.load_module(&engine, &self.component)? {
371 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
372 RunTarget::Component(c) => c,
373 };
374
375 let instance = linker.instantiate_pre(&component)?;
376 let instance = ProxyPre::new(instance)?;
377
378 let shutdown = Arc::new(GracefulShutdown::default());
382 tokio::task::spawn({
383 let shutdown = shutdown.clone();
384 async move {
385 tokio::signal::ctrl_c().await.unwrap();
386 shutdown.requested.notify_one();
387 }
388 });
389 if let Some(addr) = self.shutdown_addr {
390 let listener = tokio::net::TcpListener::bind(addr).await?;
391 eprintln!(
392 "Listening for shutdown on tcp://{}/",
393 listener.local_addr()?
394 );
395 let shutdown = shutdown.clone();
396 tokio::task::spawn(async move {
397 let _ = listener.accept().await;
398 shutdown.requested.notify_one();
399 });
400 }
401
402 let socket = match &self.addr {
403 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
404 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
405 };
406 socket.set_reuseaddr(!cfg!(windows))?;
415 socket.bind(self.addr)?;
416 let listener = socket.listen(100)?;
417
418 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
419
420 log::info!("Listening on {}", self.addr);
421
422 let handler = ProxyHandler::new(self, engine, instance);
423
424 loop {
425 let (stream, _) = tokio::select! {
429 _ = shutdown.requested.notified() => break,
430 v = listener.accept() => v?,
431 };
432 let comp = component.clone();
433 let stream = TokioIo::new(stream);
434 let h = handler.clone();
435 let shutdown_guard = shutdown.clone().increment();
436 tokio::task::spawn(async move {
437 if let Err(e) = http1::Builder::new()
438 .keep_alive(true)
439 .serve_connection(
440 stream,
441 hyper::service::service_fn(move |req| {
442 let comp = comp.clone();
443 let h = h.clone();
444 async move {
445 use http_body_util::{BodyExt, Full};
446 fn to_errorcode(_: Infallible) -> ErrorCode {
447 unreachable!()
448 }
449 match handle_request(h, req, comp).await {
450 Ok(r) => Ok::<_, Infallible>(r),
451 Err(e) => {
452 eprintln!("error: {e:?}");
453 let error_html = "\
454<!doctype html>
455<html>
456<head>
457 <title>500 Internal Server Error</title>
458</head>
459<body>
460 <center>
461 <h1>500 Internal Server Error</h1>
462 <hr>
463 wasmtime
464 </center>
465</body>
466</html>";
467 Ok(Response::builder()
468 .status(StatusCode::INTERNAL_SERVER_ERROR)
469 .header("Content-Type", "text/html; charset=UTF-8")
470 .body(
471 Full::new(bytes::Bytes::from(error_html))
472 .map_err(to_errorcode)
473 .boxed(),
474 )
475 .unwrap())
476 }
477 }
478 }
479 }),
480 )
481 .await
482 {
483 eprintln!("error: {e:?}");
484 }
485 drop(shutdown_guard);
486 });
487 }
488
489 if shutdown.close() {
495 return Ok(());
496 }
497 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
498 tokio::select! {
499 _ = tokio::signal::ctrl_c() => {}
500 _ = shutdown.complete.notified() => {}
501 }
502
503 Ok(())
504 }
505}
506
507#[derive(Default)]
509struct GracefulShutdown {
510 requested: Notify,
512 complete: Notify,
515 state: Mutex<GracefulShutdownState>,
517}
518
519#[derive(Default)]
520struct GracefulShutdownState {
521 active_tasks: u32,
522 notify_when_done: bool,
523}
524
525impl GracefulShutdown {
526 fn increment(self: Arc<Self>) -> impl Drop {
528 struct Guard(Arc<GracefulShutdown>);
529
530 let mut state = self.state.lock().unwrap();
531 assert!(!state.notify_when_done);
532 state.active_tasks += 1;
533 drop(state);
534
535 return Guard(self);
536
537 impl Drop for Guard {
538 fn drop(&mut self) {
539 let mut state = self.0.state.lock().unwrap();
540 state.active_tasks -= 1;
541 if state.notify_when_done && state.active_tasks == 0 {
542 self.0.complete.notify_one();
543 }
544 }
545 }
546 }
547
548 fn close(&self) -> bool {
551 let mut state = self.state.lock().unwrap();
552 state.notify_when_done = true;
553 state.active_tasks == 0
554 }
555}
556
557const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
561
562struct EpochThread {
563 shutdown: Arc<AtomicBool>,
564 handle: Option<std::thread::JoinHandle<()>>,
565}
566
567impl EpochThread {
568 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
569 let shutdown = Arc::new(AtomicBool::new(false));
570 let handle = {
571 let shutdown = Arc::clone(&shutdown);
572 let handle = std::thread::spawn(move || {
573 while !shutdown.load(Ordering::Relaxed) {
574 std::thread::sleep(interval);
575 engine.increment_epoch();
576 }
577 });
578 Some(handle)
579 };
580
581 EpochThread { shutdown, handle }
582 }
583}
584
585impl Drop for EpochThread {
586 fn drop(&mut self) {
587 if let Some(handle) = self.handle.take() {
588 self.shutdown.store(true, Ordering::Relaxed);
589 handle.join().unwrap();
590 }
591 }
592}
593
594type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
595
596fn setup_epoch_handler(
597 cmd: &ServeCommand,
598 store: &mut Store<Host>,
599 component: Component,
600) -> Result<(WriteProfile, Option<EpochThread>)> {
601 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
603 #[cfg(feature = "profiling")]
604 return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
605 #[cfg(not(feature = "profiling"))]
606 {
607 let _ = (path, interval);
608 bail!("support for profiling disabled at compile time!");
609 }
610 }
611
612 let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
614 let start = Instant::now();
615 store.epoch_deadline_callback(move |_store| {
616 if start.elapsed() > timeout {
617 bail!("Timeout expired");
618 }
619 Ok(UpdateDeadline::Continue(1))
620 });
621 store.set_epoch_deadline(1);
622 let engine = store.engine().clone();
623 Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
624 } else {
625 None
626 };
627
628 Ok((Box::new(|_store| {}), epoch_thread))
629}
630
631#[cfg(feature = "profiling")]
632fn setup_guest_profiler(
633 cmd: &ServeCommand,
634 store: &mut Store<Host>,
635 path: String,
636 interval: Duration,
637 component: Component,
638) -> Result<(WriteProfile, Option<EpochThread>)> {
639 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
640
641 let module_name = "<main>";
642
643 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
644 module_name,
645 interval,
646 component,
647 std::iter::empty(),
648 )));
649
650 fn sample(
651 mut store: StoreContextMut<Host>,
652 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
653 ) {
654 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
655 f(
656 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
657 store.as_context(),
658 );
659 store.data_mut().guest_profiler = Some(profiler);
660 }
661
662 store.call_hook(|store, kind| {
664 sample(store, |profiler, store| profiler.call_hook(store, kind));
665 Ok(())
666 });
667
668 let start = Instant::now();
669 let timeout = cmd.run.common.wasm.timeout;
670 store.epoch_deadline_callback(move |store| {
671 sample(store, |profiler, store| {
672 profiler.sample(store, std::time::Duration::ZERO)
673 });
674
675 if let Some(timeout) = timeout {
679 if start.elapsed() > timeout {
680 bail!("Timeout expired");
681 }
682 }
683
684 Ok(UpdateDeadline::Continue(1))
685 });
686
687 store.set_epoch_deadline(1);
688 let engine = store.engine().clone();
689 let epoch_thread = Some(EpochThread::spawn(interval, engine));
690
691 let write_profile = Box::new(move |store: &mut Store<Host>| {
692 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
693 .expect("profiling doesn't support threads yet");
694 if let Err(e) = std::fs::File::create(&path)
695 .map_err(anyhow::Error::new)
696 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
697 {
698 eprintln!("failed writing profile at {path}: {e:#}");
699 } else {
700 eprintln!();
701 eprintln!("Profile written to: {path}");
702 eprintln!("View this profile at https://profiler.firefox.com/.");
703 }
704 });
705
706 Ok((write_profile, epoch_thread))
707}
708
709struct ProxyHandlerInner {
710 cmd: ServeCommand,
711 engine: Engine,
712 instance_pre: ProxyPre<Host>,
713 next_id: AtomicU64,
714}
715
716impl ProxyHandlerInner {
717 fn next_req_id(&self) -> u64 {
718 self.next_id.fetch_add(1, Ordering::Relaxed)
719 }
720}
721
722#[derive(Clone)]
723struct ProxyHandler(Arc<ProxyHandlerInner>);
724
725impl ProxyHandler {
726 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
727 Self(Arc::new(ProxyHandlerInner {
728 cmd,
729 engine,
730 instance_pre,
731 next_id: AtomicU64::from(0),
732 }))
733 }
734}
735
736type Request = hyper::Request<hyper::body::Incoming>;
737
738async fn handle_request(
739 ProxyHandler(inner): ProxyHandler,
740 req: Request,
741 component: Component,
742) -> Result<hyper::Response<HyperOutgoingBody>> {
743 let (sender, receiver) = tokio::sync::oneshot::channel();
744
745 let req_id = inner.next_req_id();
746
747 log::info!(
748 "Request {req_id} handling {} to {}",
749 req.method(),
750 req.uri()
751 );
752
753 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
754
755 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
756 let out = store.data_mut().new_response_outparam(sender)?;
757 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
758
759 let comp = component.clone();
760 let task = tokio::task::spawn(async move {
761 let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
762
763 if let Err(e) = proxy
764 .wasi_http_incoming_handler()
765 .call_handle(&mut store, req, out)
766 .await
767 {
768 log::error!("[{req_id}] :: {e:?}");
769 return Err(e);
770 }
771
772 write_profile(&mut store);
773 drop(epoch_thread);
774
775 Ok(())
776 });
777
778 let result = match receiver.await {
779 Ok(Ok(resp)) => Ok(resp),
780 Ok(Err(e)) => Err(e.into()),
781 Err(_) => {
782 let e = match task.await {
790 Ok(Ok(())) => {
791 bail!("guest never invoked `response-outparam::set` method")
792 }
793 Ok(Err(e)) => e,
794 Err(e) => e.into(),
795 };
796 Err(e.context("guest never invoked `response-outparam::set` method"))
797 }
798 };
799
800 result
801}
802
803#[derive(Clone)]
804enum Output {
805 Stdout,
806 Stderr,
807}
808
809impl Output {
810 fn write_all(&self, buf: &[u8]) -> io::Result<()> {
811 use std::io::Write;
812
813 match self {
814 Output::Stdout => std::io::stdout().write_all(buf),
815 Output::Stderr => std::io::stderr().write_all(buf),
816 }
817 }
818}
819
820#[derive(Clone)]
821struct LogStream {
822 output: Output,
823 state: Arc<LogStreamState>,
824}
825
826struct LogStreamState {
827 prefix: String,
828 needs_prefix_on_next_write: AtomicBool,
829}
830
831impl LogStream {
832 fn new(prefix: String, output: Output) -> LogStream {
833 LogStream {
834 output,
835 state: Arc::new(LogStreamState {
836 prefix,
837 needs_prefix_on_next_write: AtomicBool::new(true),
838 }),
839 }
840 }
841
842 fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
843 while !bytes.is_empty() {
844 if self
845 .state
846 .needs_prefix_on_next_write
847 .load(Ordering::Relaxed)
848 {
849 self.output.write_all(self.state.prefix.as_bytes())?;
850 self.state
851 .needs_prefix_on_next_write
852 .store(false, Ordering::Relaxed);
853 }
854 match bytes.iter().position(|b| *b == b'\n') {
855 Some(i) => {
856 let (a, b) = bytes.split_at(i + 1);
857 bytes = b;
858 self.output.write_all(a)?;
859 self.state
860 .needs_prefix_on_next_write
861 .store(true, Ordering::Relaxed);
862 }
863 None => {
864 self.output.write_all(bytes)?;
865 break;
866 }
867 }
868 }
869
870 Ok(())
871 }
872}
873
874impl wasmtime_wasi::cli::StdoutStream for LogStream {
875 fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
876 Box::new(self.clone())
877 }
878 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
879 Box::new(self.clone())
880 }
881}
882
883impl wasmtime_wasi::cli::IsTerminal for LogStream {
884 fn is_terminal(&self) -> bool {
885 match &self.output {
886 Output::Stdout => std::io::stdout().is_terminal(),
887 Output::Stderr => std::io::stderr().is_terminal(),
888 }
889 }
890}
891
892impl wasmtime_wasi::p2::OutputStream for LogStream {
893 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
894 self.write_all(&bytes)
895 .map_err(|e| StreamError::LastOperationFailed(e.into()))?;
896 Ok(())
897 }
898
899 fn flush(&mut self) -> StreamResult<()> {
900 Ok(())
901 }
902
903 fn check_write(&mut self) -> StreamResult<usize> {
904 Ok(1024 * 1024)
905 }
906}
907
908#[async_trait::async_trait]
909impl wasmtime_wasi::p2::Pollable for LogStream {
910 async fn ready(&mut self) {}
911}
912
913impl AsyncWrite for LogStream {
914 fn poll_write(
915 mut self: Pin<&mut Self>,
916 _cx: &mut Context<'_>,
917 buf: &[u8],
918 ) -> Poll<io::Result<usize>> {
919 Poll::Ready(self.write_all(buf).map(|_| buf.len()))
920 }
921 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
922 Poll::Ready(Ok(()))
923 }
924 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
925 Poll::Ready(Ok(()))
926 }
927}
928
929fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
951 use wasmtime::{Config, Memory, MemoryType};
952 const BITS_TO_TEST: u32 = 42;
953 let mut config = Config::new();
954 config.wasm_memory64(true);
955 config.memory_reservation(1 << BITS_TO_TEST);
956 let engine = Engine::new(&config)?;
957 let mut store = Store::new(&engine, ());
958 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
961 if Memory::new(&mut store, ty).is_ok() {
962 Ok(Some(true))
963 } else {
964 Ok(None)
965 }
966}