1use crate::common::{Profile, RunCommon, RunTarget};
2use anyhow::{Result, anyhow, bail};
3use clap::Parser;
4use http::{Response, StatusCode};
5use std::convert::Infallible;
6use std::net::SocketAddr;
7use std::time::Instant;
8use std::{
9 path::PathBuf,
10 sync::{
11 Arc, Mutex,
12 atomic::{AtomicBool, AtomicU64, Ordering},
13 },
14 time::Duration,
15};
16use tokio::sync::Notify;
17use wasmtime::component::{Component, Linker};
18use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
19use wasmtime_wasi::p2::{IoView, StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
20use wasmtime_wasi_http::bindings::ProxyPre;
21use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
22use wasmtime_wasi_http::io::TokioIo;
23use wasmtime_wasi_http::{
24 DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
25 WasiHttpView, body::HyperOutgoingBody,
26};
27
28#[cfg(feature = "wasi-config")]
29use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
30#[cfg(feature = "wasi-keyvalue")]
31use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
32#[cfg(feature = "wasi-nn")]
33use wasmtime_wasi_nn::wit::WasiNnCtx;
34
35struct Host {
36 table: wasmtime::component::ResourceTable,
37 ctx: WasiCtx,
38 http: WasiHttpCtx,
39 http_outgoing_body_buffer_chunks: Option<usize>,
40 http_outgoing_body_chunk_size: Option<usize>,
41
42 limits: StoreLimits,
43
44 #[cfg(feature = "wasi-nn")]
45 nn: Option<WasiNnCtx>,
46
47 #[cfg(feature = "wasi-config")]
48 wasi_config: Option<WasiConfigVariables>,
49
50 #[cfg(feature = "wasi-keyvalue")]
51 wasi_keyvalue: Option<WasiKeyValueCtx>,
52
53 #[cfg(feature = "profiling")]
54 guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
55}
56
57impl IoView for Host {
58 fn table(&mut self) -> &mut wasmtime::component::ResourceTable {
59 &mut self.table
60 }
61}
62impl WasiView for Host {
63 fn ctx(&mut self) -> &mut WasiCtx {
64 &mut self.ctx
65 }
66}
67
68impl WasiHttpView for Host {
69 fn ctx(&mut self) -> &mut WasiHttpCtx {
70 &mut self.http
71 }
72
73 fn outgoing_body_buffer_chunks(&mut self) -> usize {
74 self.http_outgoing_body_buffer_chunks
75 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
76 }
77
78 fn outgoing_body_chunk_size(&mut self) -> usize {
79 self.http_outgoing_body_chunk_size
80 .unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
81 }
82}
83
84const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
85 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
86 8080,
87);
88
89#[derive(Parser)]
91pub struct ServeCommand {
92 #[command(flatten)]
93 run: RunCommon,
94
95 #[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
97 addr: SocketAddr,
98
99 #[arg(long, value_name = "SOCKADDR")]
104 shutdown_addr: Option<SocketAddr>,
105
106 #[arg(long)]
109 no_logging_prefix: bool,
110
111 #[arg(value_name = "WASM", required = true)]
113 component: PathBuf,
114}
115
116impl ServeCommand {
117 pub fn execute(mut self) -> Result<()> {
119 self.run.common.init_logging()?;
120
121 if self.run.common.wasi.nn == Some(true) {
125 #[cfg(not(feature = "wasi-nn"))]
126 {
127 bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
128 }
129 }
130
131 if self.run.common.wasi.threads == Some(true) {
132 bail!("wasi-threads does not support components yet")
133 }
134
135 if self.run.common.wasi.http.replace(true) == Some(false) {
138 bail!("wasi-http is required for the serve command, and must not be disabled");
139 }
140 if self.run.common.wasm.component_model.replace(true) == Some(false) {
141 bail!("components are required for the serve command, and must not be disabled");
142 }
143
144 let runtime = tokio::runtime::Builder::new_multi_thread()
145 .enable_time()
146 .enable_io()
147 .build()?;
148
149 runtime.block_on(self.serve())?;
150
151 Ok(())
152 }
153
154 fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
155 let mut builder = WasiCtxBuilder::new();
156 self.run.configure_wasip2(&mut builder)?;
157
158 builder.env("REQUEST_ID", req_id.to_string());
159
160 let stdout_prefix: String;
161 let stderr_prefix: String;
162 if self.no_logging_prefix {
163 stdout_prefix = "".to_string();
164 stderr_prefix = "".to_string();
165 } else {
166 stdout_prefix = format!("stdout [{req_id}] :: ");
167 stderr_prefix = format!("stderr [{req_id}] :: ");
168 }
169 builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
170 builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
171
172 let mut host = Host {
173 table: wasmtime::component::ResourceTable::new(),
174 ctx: builder.build(),
175 http: WasiHttpCtx::new(),
176 http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
177 http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
178
179 limits: StoreLimits::default(),
180
181 #[cfg(feature = "wasi-nn")]
182 nn: None,
183 #[cfg(feature = "wasi-config")]
184 wasi_config: None,
185 #[cfg(feature = "wasi-keyvalue")]
186 wasi_keyvalue: None,
187 #[cfg(feature = "profiling")]
188 guest_profiler: None,
189 };
190
191 if self.run.common.wasi.nn == Some(true) {
192 #[cfg(feature = "wasi-nn")]
193 {
194 let graphs = self
195 .run
196 .common
197 .wasi
198 .nn_graph
199 .iter()
200 .map(|g| (g.format.clone(), g.dir.clone()))
201 .collect::<Vec<_>>();
202 let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
203 host.nn.replace(WasiNnCtx::new(backends, registry));
204 }
205 }
206
207 if self.run.common.wasi.config == Some(true) {
208 #[cfg(feature = "wasi-config")]
209 {
210 let vars = WasiConfigVariables::from_iter(
211 self.run
212 .common
213 .wasi
214 .config_var
215 .iter()
216 .map(|v| (v.key.clone(), v.value.clone())),
217 );
218 host.wasi_config.replace(vars);
219 }
220 }
221
222 if self.run.common.wasi.keyvalue == Some(true) {
223 #[cfg(feature = "wasi-keyvalue")]
224 {
225 let ctx = WasiKeyValueCtxBuilder::new()
226 .in_memory_data(
227 self.run
228 .common
229 .wasi
230 .keyvalue_in_memory_data
231 .iter()
232 .map(|v| (v.key.clone(), v.value.clone())),
233 )
234 .build();
235 host.wasi_keyvalue.replace(ctx);
236 }
237 }
238
239 let mut store = Store::new(engine, host);
240
241 store.data_mut().limits = self.run.store_limits();
242 store.limiter(|t| &mut t.limits);
243
244 if let Some(fuel) = self.run.common.wasm.fuel {
247 store.set_fuel(fuel)?;
248 }
249
250 Ok(store)
251 }
252
253 fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
254 let mut cli = self.run.common.wasi.cli;
255
256 if let Some(common) = self.run.common.wasi.common {
258 if cli.is_some() {
259 bail!(
260 "The -Scommon option should not be use with -Scli as it is a deprecated alias"
261 );
262 } else {
263 cli = Some(common);
266 }
267 }
268
269 if cli == Some(true) {
278 let link_options = self.run.compute_wasi_features();
279 wasmtime_wasi::p2::add_to_linker_with_options_async(linker, &link_options)?;
280 wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
281 } else {
282 wasmtime_wasi_http::add_to_linker_async(linker)?;
283 }
284
285 if self.run.common.wasi.nn == Some(true) {
286 #[cfg(not(feature = "wasi-nn"))]
287 {
288 bail!("support for wasi-nn was disabled at compile time");
289 }
290 #[cfg(feature = "wasi-nn")]
291 {
292 wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
293 let ctx = h.nn.as_mut().unwrap();
294 wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
295 })?;
296 }
297 }
298
299 if self.run.common.wasi.config == Some(true) {
300 #[cfg(not(feature = "wasi-config"))]
301 {
302 bail!("support for wasi-config was disabled at compile time");
303 }
304 #[cfg(feature = "wasi-config")]
305 {
306 wasmtime_wasi_config::add_to_linker(linker, |h| {
307 WasiConfig::from(h.wasi_config.as_ref().unwrap())
308 })?;
309 }
310 }
311
312 if self.run.common.wasi.keyvalue == Some(true) {
313 #[cfg(not(feature = "wasi-keyvalue"))]
314 {
315 bail!("support for wasi-keyvalue was disabled at compile time");
316 }
317 #[cfg(feature = "wasi-keyvalue")]
318 {
319 wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
320 WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
321 })?;
322 }
323 }
324
325 if self.run.common.wasi.threads == Some(true) {
326 bail!("support for wasi-threads is not available with components");
327 }
328
329 if self.run.common.wasi.http == Some(false) {
330 bail!("support for wasi-http must be enabled for `serve` subcommand");
331 }
332
333 Ok(())
334 }
335
336 async fn serve(mut self) -> Result<()> {
337 use hyper::server::conn::http1;
338
339 let mut config = self
340 .run
341 .common
342 .config(use_pooling_allocator_by_default().unwrap_or(None))?;
343 config.wasm_component_model(true);
344 config.async_support(true);
345
346 if self.run.common.wasm.timeout.is_some() {
347 config.epoch_interruption(true);
348 }
349
350 match self.run.profile {
351 Some(Profile::Native(s)) => {
352 config.profiler(s);
353 }
354 Some(Profile::Guest { .. }) => {
355 config.epoch_interruption(true);
356 }
357 None => {}
358 }
359
360 let engine = Engine::new(&config)?;
361 let mut linker = Linker::new(&engine);
362
363 self.add_to_linker(&mut linker)?;
364
365 let component = match self.run.load_module(&engine, &self.component)? {
366 RunTarget::Core(_) => bail!("The serve command currently requires a component"),
367 RunTarget::Component(c) => c,
368 };
369
370 let instance = linker.instantiate_pre(&component)?;
371 let instance = ProxyPre::new(instance)?;
372
373 let shutdown = Arc::new(GracefulShutdown::default());
377 tokio::task::spawn({
378 let shutdown = shutdown.clone();
379 async move {
380 tokio::signal::ctrl_c().await.unwrap();
381 shutdown.requested.notify_one();
382 }
383 });
384 if let Some(addr) = self.shutdown_addr {
385 let listener = tokio::net::TcpListener::bind(addr).await?;
386 eprintln!(
387 "Listening for shutdown on tcp://{}/",
388 listener.local_addr()?
389 );
390 let shutdown = shutdown.clone();
391 tokio::task::spawn(async move {
392 let _ = listener.accept().await;
393 shutdown.requested.notify_one();
394 });
395 }
396
397 let socket = match &self.addr {
398 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
399 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
400 };
401 socket.set_reuseaddr(!cfg!(windows))?;
410 socket.bind(self.addr)?;
411 let listener = socket.listen(100)?;
412
413 eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
414
415 log::info!("Listening on {}", self.addr);
416
417 let handler = ProxyHandler::new(self, engine, instance);
418
419 loop {
420 let (stream, _) = tokio::select! {
424 _ = shutdown.requested.notified() => break,
425 v = listener.accept() => v?,
426 };
427 let comp = component.clone();
428 let stream = TokioIo::new(stream);
429 let h = handler.clone();
430 let shutdown_guard = shutdown.clone().increment();
431 tokio::task::spawn(async move {
432 if let Err(e) = http1::Builder::new()
433 .keep_alive(true)
434 .serve_connection(
435 stream,
436 hyper::service::service_fn(move |req| {
437 let comp = comp.clone();
438 let h = h.clone();
439 async move {
440 use http_body_util::{BodyExt, Full};
441 fn to_errorcode(_: Infallible) -> ErrorCode {
442 unreachable!()
443 }
444 match handle_request(h, req, comp).await {
445 Ok(r) => Ok::<_, Infallible>(r),
446 Err(e) => {
447 eprintln!("error: {e:?}");
448 let error_html = "\
449<!doctype html>
450<html>
451<head>
452 <title>500 Internal Server Error</title>
453</head>
454<body>
455 <center>
456 <h1>500 Internal Server Error</h1>
457 <hr>
458 wasmtime
459 </center>
460</body>
461</html>";
462 Ok(Response::builder()
463 .status(StatusCode::INTERNAL_SERVER_ERROR)
464 .header("Content-Type", "text/html; charset=UTF-8")
465 .body(
466 Full::new(bytes::Bytes::from(error_html))
467 .map_err(to_errorcode)
468 .boxed(),
469 )
470 .unwrap())
471 }
472 }
473 }
474 }),
475 )
476 .await
477 {
478 eprintln!("error: {e:?}");
479 }
480 drop(shutdown_guard);
481 });
482 }
483
484 if shutdown.close() {
490 return Ok(());
491 }
492 eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
493 tokio::select! {
494 _ = tokio::signal::ctrl_c() => {}
495 _ = shutdown.complete.notified() => {}
496 }
497
498 Ok(())
499 }
500}
501
502#[derive(Default)]
504struct GracefulShutdown {
505 requested: Notify,
507 complete: Notify,
510 state: Mutex<GracefulShutdownState>,
512}
513
514#[derive(Default)]
515struct GracefulShutdownState {
516 active_tasks: u32,
517 notify_when_done: bool,
518}
519
520impl GracefulShutdown {
521 fn increment(self: Arc<Self>) -> impl Drop {
523 struct Guard(Arc<GracefulShutdown>);
524
525 let mut state = self.state.lock().unwrap();
526 assert!(!state.notify_when_done);
527 state.active_tasks += 1;
528 drop(state);
529
530 return Guard(self);
531
532 impl Drop for Guard {
533 fn drop(&mut self) {
534 let mut state = self.0.state.lock().unwrap();
535 state.active_tasks -= 1;
536 if state.notify_when_done && state.active_tasks == 0 {
537 self.0.complete.notify_one();
538 }
539 }
540 }
541 }
542
543 fn close(&self) -> bool {
546 let mut state = self.state.lock().unwrap();
547 state.notify_when_done = true;
548 state.active_tasks == 0
549 }
550}
551
552const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
556
557struct EpochThread {
558 shutdown: Arc<AtomicBool>,
559 handle: Option<std::thread::JoinHandle<()>>,
560}
561
562impl EpochThread {
563 fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
564 let shutdown = Arc::new(AtomicBool::new(false));
565 let handle = {
566 let shutdown = Arc::clone(&shutdown);
567 let handle = std::thread::spawn(move || {
568 while !shutdown.load(Ordering::Relaxed) {
569 std::thread::sleep(interval);
570 engine.increment_epoch();
571 }
572 });
573 Some(handle)
574 };
575
576 EpochThread { shutdown, handle }
577 }
578}
579
580impl Drop for EpochThread {
581 fn drop(&mut self) {
582 if let Some(handle) = self.handle.take() {
583 self.shutdown.store(true, Ordering::Relaxed);
584 handle.join().unwrap();
585 }
586 }
587}
588
589type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
590
591fn setup_epoch_handler(
592 cmd: &ServeCommand,
593 store: &mut Store<Host>,
594 component: Component,
595) -> Result<(WriteProfile, Option<EpochThread>)> {
596 if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
598 #[cfg(feature = "profiling")]
599 return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
600 #[cfg(not(feature = "profiling"))]
601 {
602 let _ = (path, interval);
603 bail!("support for profiling disabled at compile time!");
604 }
605 }
606
607 let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
609 let start = Instant::now();
610 store.epoch_deadline_callback(move |_store| {
611 if start.elapsed() > timeout {
612 bail!("Timeout expired");
613 }
614 Ok(UpdateDeadline::Continue(1))
615 });
616 store.set_epoch_deadline(1);
617 let engine = store.engine().clone();
618 Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
619 } else {
620 None
621 };
622
623 Ok((Box::new(|_store| {}), epoch_thread))
624}
625
626#[cfg(feature = "profiling")]
627fn setup_guest_profiler(
628 cmd: &ServeCommand,
629 store: &mut Store<Host>,
630 path: String,
631 interval: Duration,
632 component: Component,
633) -> Result<(WriteProfile, Option<EpochThread>)> {
634 use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
635
636 let module_name = "<main>";
637
638 store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
639 module_name,
640 interval,
641 component,
642 std::iter::empty(),
643 )));
644
645 fn sample(
646 mut store: StoreContextMut<Host>,
647 f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
648 ) {
649 let mut profiler = store.data_mut().guest_profiler.take().unwrap();
650 f(
651 Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
652 store.as_context(),
653 );
654 store.data_mut().guest_profiler = Some(profiler);
655 }
656
657 store.call_hook(|store, kind| {
659 sample(store, |profiler, store| profiler.call_hook(store, kind));
660 Ok(())
661 });
662
663 let start = Instant::now();
664 let timeout = cmd.run.common.wasm.timeout;
665 store.epoch_deadline_callback(move |store| {
666 sample(store, |profiler, store| {
667 profiler.sample(store, std::time::Duration::ZERO)
668 });
669
670 if let Some(timeout) = timeout {
674 if start.elapsed() > timeout {
675 bail!("Timeout expired");
676 }
677 }
678
679 Ok(UpdateDeadline::Continue(1))
680 });
681
682 store.set_epoch_deadline(1);
683 let engine = store.engine().clone();
684 let epoch_thread = Some(EpochThread::spawn(interval, engine));
685
686 let write_profile = Box::new(move |store: &mut Store<Host>| {
687 let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
688 .expect("profiling doesn't support threads yet");
689 if let Err(e) = std::fs::File::create(&path)
690 .map_err(anyhow::Error::new)
691 .and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
692 {
693 eprintln!("failed writing profile at {path}: {e:#}");
694 } else {
695 eprintln!();
696 eprintln!("Profile written to: {path}");
697 eprintln!("View this profile at https://profiler.firefox.com/.");
698 }
699 });
700
701 Ok((write_profile, epoch_thread))
702}
703
704struct ProxyHandlerInner {
705 cmd: ServeCommand,
706 engine: Engine,
707 instance_pre: ProxyPre<Host>,
708 next_id: AtomicU64,
709}
710
711impl ProxyHandlerInner {
712 fn next_req_id(&self) -> u64 {
713 self.next_id.fetch_add(1, Ordering::Relaxed)
714 }
715}
716
717#[derive(Clone)]
718struct ProxyHandler(Arc<ProxyHandlerInner>);
719
720impl ProxyHandler {
721 fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
722 Self(Arc::new(ProxyHandlerInner {
723 cmd,
724 engine,
725 instance_pre,
726 next_id: AtomicU64::from(0),
727 }))
728 }
729}
730
731type Request = hyper::Request<hyper::body::Incoming>;
732
733async fn handle_request(
734 ProxyHandler(inner): ProxyHandler,
735 req: Request,
736 component: Component,
737) -> Result<hyper::Response<HyperOutgoingBody>> {
738 let (sender, receiver) = tokio::sync::oneshot::channel();
739
740 let req_id = inner.next_req_id();
741
742 log::info!(
743 "Request {req_id} handling {} to {}",
744 req.method(),
745 req.uri()
746 );
747
748 let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
749
750 let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
751 let out = store.data_mut().new_response_outparam(sender)?;
752 let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
753
754 let comp = component.clone();
755 let task = tokio::task::spawn(async move {
756 let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
757
758 if let Err(e) = proxy
759 .wasi_http_incoming_handler()
760 .call_handle(&mut store, req, out)
761 .await
762 {
763 log::error!("[{req_id}] :: {:?}", e);
764 return Err(e);
765 }
766
767 write_profile(&mut store);
768 drop(epoch_thread);
769
770 Ok(())
771 });
772
773 let result = match receiver.await {
774 Ok(Ok(resp)) => Ok(resp),
775 Ok(Err(e)) => Err(e.into()),
776 Err(_) => {
777 let e = match task.await {
785 Ok(Ok(())) => {
786 bail!("guest never invoked `response-outparam::set` method")
787 }
788 Ok(Err(e)) => e,
789 Err(e) => e.into(),
790 };
791 Err(e.context("guest never invoked `response-outparam::set` method"))
792 }
793 };
794
795 result
796}
797
798#[derive(Clone)]
799enum Output {
800 Stdout,
801 Stderr,
802}
803
804impl Output {
805 fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
806 use std::io::Write;
807
808 match self {
809 Output::Stdout => std::io::stdout().write_all(buf),
810 Output::Stderr => std::io::stderr().write_all(buf),
811 }
812 .map_err(|e| anyhow!(e))
813 }
814}
815
816#[derive(Clone)]
817struct LogStream {
818 prefix: String,
819 output: Output,
820 needs_prefix_on_next_write: bool,
821}
822
823impl LogStream {
824 fn new(prefix: String, output: Output) -> LogStream {
825 LogStream {
826 prefix,
827 output,
828 needs_prefix_on_next_write: true,
829 }
830 }
831}
832
833impl wasmtime_wasi::p2::StdoutStream for LogStream {
834 fn stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
835 Box::new(self.clone())
836 }
837
838 fn isatty(&self) -> bool {
839 use std::io::IsTerminal;
840
841 match &self.output {
842 Output::Stdout => std::io::stdout().is_terminal(),
843 Output::Stderr => std::io::stderr().is_terminal(),
844 }
845 }
846}
847
848impl wasmtime_wasi::p2::OutputStream for LogStream {
849 fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
850 let mut bytes = &bytes[..];
851
852 while !bytes.is_empty() {
853 if self.needs_prefix_on_next_write {
854 self.output
855 .write_all(self.prefix.as_bytes())
856 .map_err(StreamError::LastOperationFailed)?;
857 self.needs_prefix_on_next_write = false;
858 }
859 match bytes.iter().position(|b| *b == b'\n') {
860 Some(i) => {
861 let (a, b) = bytes.split_at(i + 1);
862 bytes = b;
863 self.output
864 .write_all(a)
865 .map_err(StreamError::LastOperationFailed)?;
866 self.needs_prefix_on_next_write = true;
867 }
868 None => {
869 self.output
870 .write_all(bytes)
871 .map_err(StreamError::LastOperationFailed)?;
872 break;
873 }
874 }
875 }
876
877 Ok(())
878 }
879
880 fn flush(&mut self) -> StreamResult<()> {
881 Ok(())
882 }
883
884 fn check_write(&mut self) -> StreamResult<usize> {
885 Ok(1024 * 1024)
886 }
887}
888
889#[async_trait::async_trait]
890impl wasmtime_wasi::p2::Pollable for LogStream {
891 async fn ready(&mut self) {}
892}
893
894fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
916 use wasmtime::{Config, Memory, MemoryType};
917 const BITS_TO_TEST: u32 = 42;
918 let mut config = Config::new();
919 config.wasm_memory64(true);
920 config.memory_reservation(1 << BITS_TO_TEST);
921 let engine = Engine::new(&config)?;
922 let mut store = Store::new(&engine, ());
923 let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
926 if Memory::new(&mut store, ty).is_ok() {
927 Ok(Some(true))
928 } else {
929 Ok(None)
930 }
931}