1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceTable, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
26use wasmtime_wasi_http::p2::{WasiHttpCtxView, WasiHttpView};
27use wasmtime_wasi_http::WasiHttpCtx;
28use wrpc_runtime_wasmtime::{
29 collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
30 RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
31};
32use wrpc_transport::{Invoke, Serve};
33
34mod nats;
35mod tcp;
36
37const DEFAULT_TIMEOUT: &str = "10s";
38
39#[derive(Parser, Debug)]
40#[command(author, version, about, long_about = None)]
41enum Command {
42 #[command(subcommand)]
43 Nats(nats::Command),
44 #[command(subcommand)]
45 Tcp(tcp::Command),
46}
47
48pub enum Workload {
49 Url(Url),
50 Binary(Vec<u8>),
51}
52
53pub struct WrpcCtx<C: Invoke> {
54 pub wrpc: C,
55 pub cx: C::Context,
56 pub shared_resources: SharedResourceTable,
57 pub timeout: Duration,
58}
59
60pub struct Ctx<C: Invoke> {
61 pub table: ResourceTable,
62 pub wasi: WasiCtx,
63 pub http: WasiHttpCtx,
64 pub wrpc: WrpcCtx<C>,
65}
66
67impl<C> wrpc_runtime_wasmtime::WrpcCtx<C> for WrpcCtx<C>
68where
69 C: Invoke,
70 C::Context: Clone,
71{
72 fn context(&self) -> C::Context {
73 self.cx.clone()
74 }
75
76 fn client(&self) -> &C {
77 &self.wrpc
78 }
79
80 fn shared_resources(&mut self) -> &mut SharedResourceTable {
81 &mut self.shared_resources
82 }
83
84 fn timeout(&self) -> Option<Duration> {
85 Some(self.timeout)
86 }
87}
88
89impl<C> WrpcView for Ctx<C>
90where
91 C: Invoke,
92 C::Context: Clone,
93{
94 type Invoke = C;
95
96 fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
97 WrpcCtxView {
98 ctx: &mut self.wrpc,
99 table: &mut self.table,
100 }
101 }
102}
103
104impl<C: Invoke> WasiView for Ctx<C> {
105 fn ctx(&mut self) -> WasiCtxView<'_> {
106 WasiCtxView {
107 ctx: &mut self.wasi,
108 table: &mut self.table,
109 }
110 }
111}
112
113impl<C: Invoke> WasiHttpView for Ctx<C> {
114 fn http(&mut self) -> WasiHttpCtxView<'_> {
115 WasiHttpCtxView {
116 ctx: &mut self.http,
117 table: &mut self.table,
118 hooks: wasmtime_wasi_http::p2::default_hooks(),
119 }
120 }
121}
122
123fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
125 const BITS_TO_TEST: u32 = 42;
126 let mut config = wasmtime::Config::new();
127 config.wasm_memory64(true);
128 config.memory_reservation(1 << BITS_TO_TEST);
129 let engine = wasmtime::Engine::new(&config)?;
130 let mut store = wasmtime::Store::new(&engine, ());
131 let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
134 if wasmtime::Memory::new(&mut store, ty).is_ok() {
135 Ok(Some(true))
136 } else {
137 Ok(None)
138 }
139}
140
141fn is_0_2(version: &str, min_patch: u64) -> bool {
142 if let Ok(semver::Version {
143 major,
144 minor,
145 patch,
146 pre,
147 build,
148 }) = version.parse()
149 {
150 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
151 } else {
152 false
153 }
154}
155
156#[instrument(level = "trace", skip(adapter))]
157async fn instantiate_pre<C>(
158 adapter: &[u8],
159 workload: &str,
160) -> anyhow::Result<(
161 InstancePre<Ctx<C>>,
162 Engine,
163 Arc<[ResourceType]>,
164 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
165)>
166where
167 C: Invoke + Clone + 'static,
168 C::Context: Clone + 'static,
169{
170 let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
171 .context("failed to construct common Wasmtime options")?;
172 let mut config = opts
173 .config(use_pooling_allocator_by_default().unwrap_or(None))
174 .map_err(anyhow::Error::from)
175 .context("failed to construct Wasmtime config")?;
176 config.wasm_component_model(true);
177 let engine = wasmtime::Engine::new(&config)
178 .map_err(anyhow::Error::from)
179 .context("failed to initialize Wasmtime engine")?;
180
181 let wasm = if workload.starts_with('.') || workload.starts_with('/') {
182 fs::read(&workload)
183 .await
184 .with_context(|| format!("failed to read relative path to workload `{workload}`"))
185 .map(Workload::Binary)
186 } else {
187 Url::parse(workload)
188 .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
189 .map(Workload::Url)
190 }?;
191 let wasm = match wasm {
192 Workload::Url(wasm) => match wasm.scheme() {
193 "file" => {
194 let wasm = wasm
195 .to_file_path()
196 .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
197 fs::read(wasm)
198 .await
199 .context("failed to read Wasm from file URL")?
200 }
201 "http" | "https" => {
202 let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
203 let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
204 wasm.to_vec()
205 }
206 scheme => bail!("URL scheme `{scheme}` not supported"),
207 },
208 Workload::Binary(wasm) => wasm,
209 };
210 let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
211 wit_component::ComponentEncoder::default()
212 .validate(true)
213 .module(&wasm)
214 .context("failed to set core component module")?
215 .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
216 .context("failed to add WASI adapter")?
217 .encode()
218 .context("failed to encode a component")?
219 } else {
220 wasm
221 };
222
223 let component = Component::new(&engine, wasm)
224 .map_err(anyhow::Error::from)
225 .context("failed to compile component")?;
226
227 let mut linker = Linker::<Ctx<C>>::new(&engine);
228 wasmtime_wasi::p2::add_to_linker_async(&mut linker)
229 .map_err(anyhow::Error::from)
230 .context("failed to link WASI")?;
231 wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
232 .map_err(anyhow::Error::from)
233 .context("failed to link `wasi:http`")?;
234 wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker)
235 .map_err(anyhow::Error::from)
236 .context("failed to link `wrpc:rpc`")?;
237
238 let ty = component.component_type();
239 let mut host_resources = BTreeMap::default();
240 let mut guest_resources = Vec::new();
241 collect_component_resource_imports(&engine, &ty, &mut host_resources);
242 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
243 let io_err_tys = host_resources
244 .range::<str, _>((
245 Bound::Included("wasi:io/error@0.2"),
246 Bound::Excluded("wasi:io/error@0.3"),
247 ))
248 .flat_map(|(_, instance)| instance.get("error"))
249 .copied()
250 .collect::<Box<[_]>>();
251 let io_pollable_tys = host_resources
252 .range::<str, _>((
253 Bound::Included("wasi:io/poll@0.2"),
254 Bound::Excluded("wasi:io/poll@0.3"),
255 ))
256 .flat_map(|(_, instance)| instance.get("pollable"))
257 .copied()
258 .collect::<Box<[_]>>();
259 let io_input_stream_tys = host_resources
260 .range::<str, _>((
261 Bound::Included("wasi:io/streams@0.2"),
262 Bound::Excluded("wasi:io/streams@0.3"),
263 ))
264 .flat_map(|(_, instance)| instance.get("input-stream"))
265 .copied()
266 .collect::<Box<[_]>>();
267 let io_output_stream_tys = host_resources
268 .range::<str, _>((
269 Bound::Included("wasi:io/streams@0.2"),
270 Bound::Excluded("wasi:io/streams@0.3"),
271 ))
272 .flat_map(|(_, instance)| instance.get("output-stream"))
273 .copied()
274 .collect::<Box<[_]>>();
275 let rpc_err_ty = host_resources
276 .get("wrpc:rpc/error@0.1.0")
277 .and_then(|instance| instance.get("error"))
278 .copied();
279 let host_resources = host_resources
281 .into_iter()
282 .map(|(name, instance)| {
283 let instance = instance
284 .into_iter()
285 .map(|(name, ty)| {
286 let host_ty = match ty {
287 ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
288 ty if io_err_tys.contains(&ty) => {
289 ResourceType::host::<wasmtime_wasi::p2::bindings::io::error::Error>()
290 }
291 ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
292 wasmtime_wasi::p2::bindings::io::streams::InputStream,
293 >(),
294 ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
295 wasmtime_wasi::p2::bindings::io::streams::OutputStream,
296 >(),
297 ty if io_pollable_tys.contains(&ty) => {
298 ResourceType::host::<wasmtime_wasi::p2::bindings::io::poll::Pollable>()
299 }
300 _ => ResourceType::host::<RemoteResource>(),
301 };
302 (name, (ty, host_ty))
303 })
304 .collect::<HashMap<_, _>>();
305 (name, instance)
306 })
307 .collect::<HashMap<_, _>>();
308 let host_resources = Arc::from(host_resources);
309 let guest_resources = Arc::from(guest_resources);
310 for (name, item) in ty.imports(&engine) {
311 match name.split_once('/').map(|(pkg, suffix)| {
313 suffix
314 .split_once('@')
315 .map_or((pkg, suffix, None), |(iface, version)| {
316 (pkg, iface, Some(version))
317 })
318 }) {
319 Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
320 Some((
321 "wasi:cli",
322 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
323 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
324 Some(version),
325 )) if is_0_2(version, 0) => {}
326 Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
327 if is_0_2(version, 0) => {}
328 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
329 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
330 if is_0_2(version, 0) => {}
331 Some((
332 "wasi:http",
333 "incoming-handler" | "outgoing-handler" | "types",
334 Some(version),
335 )) if is_0_2(version, 0) => {}
336 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
337 if is_0_2(version, 0) => {}
338 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
339 if is_0_2(version, 0) => {}
340 Some((
341 "wasi:sockets",
342 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
343 | "udp-create-socket" | "udp",
344 Some(version),
345 )) if is_0_2(version, 0) => {}
346 _ => {
347 if let Err(err) = link_item(
348 &engine,
349 &mut linker.root(),
350 Arc::clone(&guest_resources),
351 Arc::clone(&host_resources),
352 item,
353 "",
354 name,
355 ) {
356 error!(?err, "failed to polyfill instance");
357 }
358 }
359 }
360 }
361
362 let pre = linker
363 .instantiate_pre(&component)
364 .map_err(anyhow::Error::from)
365 .context("failed to pre-instantiate component")?;
366 Ok((pre, engine, guest_resources, host_resources))
367}
368
369fn new_store<C: Invoke>(
370 engine: &Engine,
371 wrpc: C,
372 cx: C::Context,
373 arg0: &str,
374 timeout: Duration,
375) -> wasmtime::Store<Ctx<C>> {
376 Store::new(
377 engine,
378 Ctx {
379 wasi: WasiCtxBuilder::new()
380 .inherit_env()
381 .inherit_stdio()
382 .inherit_network()
383 .allow_ip_name_lookup(true)
384 .allow_tcp(true)
385 .allow_udp(true)
386 .args(&[arg0])
387 .build(),
388 http: WasiHttpCtx::new(),
389 table: ResourceTable::new(),
390 wrpc: WrpcCtx {
391 wrpc,
392 cx,
393 shared_resources: SharedResourceTable::default(),
394 timeout,
395 },
396 },
397 )
398}
399
400#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
401pub async fn handle_run<C>(
402 clt: C,
403 cx: C::Context,
404 timeout: Duration,
405 workload: &str,
406) -> anyhow::Result<()>
407where
408 C: Invoke + Clone + 'static,
409 C::Context: Clone + 'static,
410{
411 let (pre, engine, _, _) =
412 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
413 let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
414 let cmd = wasmtime_wasi::p2::bindings::CommandPre::new(pre)
415 .map_err(anyhow::Error::from)
416 .context("failed to construct `command` instance")?
417 .instantiate_async(&mut store)
418 .await
419 .map_err(anyhow::Error::from)
420 .context("failed to instantiate `command`")?;
421 cmd.wasi_cli_run()
422 .call_run(&mut store)
423 .await
424 .map_err(anyhow::Error::from)
425 .context("failed to run component")?
426 .map_err(|()| anyhow!("component failed"))
427}
428
429#[instrument(level = "trace", skip_all, ret(level = "trace"))]
430pub async fn serve_shared<C, S>(
431 handlers: &mut JoinSet<()>,
432 srv: S,
433 mut store: wasmtime::Store<Ctx<C>>,
434 pre: InstancePre<Ctx<C>>,
435 guest_resources: Arc<[ResourceType]>,
436 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
437) -> anyhow::Result<()>
438where
439 C: Invoke + 'static,
440 C::Context: Clone,
441 S: Serve,
442{
443 let span = Span::current();
444 let instance = pre
445 .instantiate_async(&mut store)
446 .await
447 .map_err(anyhow::Error::from)
448 .context("failed to instantiate component")?;
449 let engine = store.engine().clone();
450 let io_stream_resources: Arc<[ResourceType]> =
451 wrpc_runtime_wasmtime::paths::wasi_io_stream_resources(
452 &engine,
453 &pre.component().component_type(),
454 )
455 .into();
456 let store = Arc::new(Mutex::new(store));
457 for (name, ty) in pre.component().component_type().exports(&engine) {
458 match (name, ty) {
459 (name, types::ComponentItem::ComponentFunc(ty)) => {
460 info!(?name, "serving root function");
461 let invocations = srv
462 .serve_function_shared(
463 Arc::clone(&store),
464 instance,
465 Arc::clone(&guest_resources),
466 Arc::clone(&host_resources),
467 Arc::clone(&io_stream_resources),
468 ty,
469 "",
470 name,
471 )
472 .await?;
473 handlers.spawn(
474 async move {
475 let mut invocations = pin!(invocations);
476 while let Some(invocation) = invocations.next().await {
477 match invocation {
478 Ok((_, fut)) => {
479 info!("serving root function invocation");
480 if let Err(err) = fut.await {
481 warn!(?err, "failed to serve root function invocation");
482 } else {
483 info!("successfully served root function invocation");
484 }
485 }
486 Err(err) => {
487 error!(?err, "failed to accept root function invocation");
488 }
489 }
490 }
491 }
492 .instrument(span.clone()),
493 );
494 }
495 (_, types::ComponentItem::CoreFunc(_)) => {
496 warn!(name, "serving root core function exports not supported yet");
497 }
498 (_, types::ComponentItem::Module(_)) => {
499 warn!(name, "serving root module exports not supported yet");
500 }
501 (_, types::ComponentItem::Component(_)) => {
502 warn!(name, "serving root component exports not supported yet");
503 }
504 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
505 for (name, ty) in ty.exports(&engine) {
506 match ty {
507 types::ComponentItem::ComponentFunc(ty) => {
508 info!(?name, "serving instance function");
509 let invocations = srv
510 .serve_function_shared(
511 Arc::clone(&store),
512 instance,
513 Arc::clone(&guest_resources),
514 Arc::clone(&host_resources),
515 Arc::clone(&io_stream_resources),
516 ty,
517 instance_name,
518 name,
519 )
520 .await?;
521 handlers.spawn(async move {
522 let mut invocations = pin!(invocations);
523 while let Some(invocation) = invocations.next().await {
524 match invocation {
525 Ok((_, fut)) => {
526 info!("serving instance function invocation");
527 if let Err(err) = fut.await {
528 warn!(
529 ?err,
530 "failed to serve instance function invocation"
531 );
532 } else {
533 info!(
534 "successfully served instance function invocation"
535 );
536 }
537 }
538 Err(err) => {
539 error!(
540 ?err,
541 "failed to accept instance function invocation"
542 );
543 }
544 }
545 }
546 }
547 .instrument(span.clone()));
548 }
549 types::ComponentItem::CoreFunc(_) => {
550 warn!(
551 instance_name,
552 name, "serving instance core function exports not supported yet"
553 );
554 }
555 types::ComponentItem::Module(_) => {
556 warn!(
557 instance_name,
558 name, "serving instance module exports not supported yet"
559 );
560 }
561 types::ComponentItem::Component(_) => {
562 warn!(
563 instance_name,
564 name, "serving instance component exports not supported yet"
565 );
566 }
567 types::ComponentItem::ComponentInstance(_) => {
568 warn!(
569 instance_name,
570 name, "serving nested instance exports not supported yet"
571 );
572 }
573 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
574 }
575 }
576 }
577 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
578 }
579 }
580 Ok(())
581}
582
583#[instrument(level = "trace", skip_all, ret(level = "trace"))]
584#[allow(clippy::too_many_arguments)]
585pub async fn serve_stateless<C, S>(
586 handlers: &mut JoinSet<()>,
587 srv: S,
588 clt: C,
589 cx: C::Context,
590 pre: InstancePre<Ctx<C>>,
591 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
592 engine: &Engine,
593 timeout: Duration,
594) -> anyhow::Result<()>
595where
596 C: Invoke + Clone + 'static,
597 C::Context: Clone + 'static,
598 S: Serve,
599{
600 let span = Span::current();
601 for (name, ty) in pre.component().component_type().exports(engine) {
602 match (name, ty) {
603 (name, types::ComponentItem::ComponentFunc(ty)) => {
604 let clt = clt.clone();
605 let cx = cx.clone();
606 let engine = engine.clone();
607 info!(?name, "serving root function");
608 let invocations = srv
609 .serve_function(
610 move || {
611 new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
612 },
613 pre.clone(),
614 Arc::clone(&host_resources),
615 ty,
616 "",
617 name,
618 )
619 .await?;
620 handlers.spawn(
621 async move {
622 let mut invocations = pin!(invocations);
623 while let Some(invocation) = invocations.next().await {
624 match invocation {
625 Ok((_, fut)) => {
626 info!("serving root function invocation");
627 if let Err(err) = fut.await {
628 warn!(?err, "failed to serve root function invocation");
629 } else {
630 info!("successfully served root function invocation");
631 }
632 }
633 Err(err) => {
634 error!(?err, "failed to accept root function invocation");
635 }
636 }
637 }
638 }
639 .instrument(span.clone()),
640 );
641 }
642 (_, types::ComponentItem::CoreFunc(_)) => {
643 warn!(name, "serving root core function exports not supported yet");
644 }
645 (_, types::ComponentItem::Module(_)) => {
646 warn!(name, "serving root module exports not supported yet");
647 }
648 (_, types::ComponentItem::Component(_)) => {
649 warn!(name, "serving root component exports not supported yet");
650 }
651 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
652 for (name, ty) in ty.exports(engine) {
653 match ty {
654 types::ComponentItem::ComponentFunc(ty) => {
655 let clt = clt.clone();
656 let engine = engine.clone();
657 let cx = cx.clone();
658 info!(?name, "serving instance function");
659 let invocations = srv
660 .serve_function(
661 move || {
662 new_store(
663 &engine,
664 clt.clone(),
665 cx.clone(),
666 "reactor.wasm",
667 timeout,
668 )
669 },
670 pre.clone(),
671 Arc::clone(&host_resources),
672 ty,
673 instance_name,
674 name,
675 )
676 .await?;
677 handlers.spawn(async move {
678 let mut invocations = pin!(invocations);
679 while let Some(invocation) = invocations.next().await {
680 match invocation {
681 Ok((_, fut)) => {
682 info!("serving instance function invocation");
683 if let Err(err) = fut.await {
684 warn!(
685 ?err,
686 "failed to serve instance function invocation"
687 );
688 } else {
689 info!(
690 "successfully served instance function invocation"
691 );
692 }
693 }
694 Err(err) => {
695 error!(
696 ?err,
697 "failed to accept instance function invocation"
698 );
699 }
700 }
701 }
702 }.instrument(span.clone()));
703 }
704 types::ComponentItem::CoreFunc(_) => {
705 warn!(
706 instance_name,
707 name, "serving instance core function exports not supported yet"
708 );
709 }
710 types::ComponentItem::Module(_) => {
711 warn!(
712 instance_name,
713 name, "serving instance module exports not supported yet"
714 );
715 }
716 types::ComponentItem::Component(_) => {
717 warn!(
718 instance_name,
719 name, "serving instance component exports not supported yet"
720 );
721 }
722 types::ComponentItem::ComponentInstance(_) => {
723 warn!(
724 instance_name,
725 name, "serving nested instance exports not supported yet"
726 );
727 }
728 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
729 }
730 }
731 }
732 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
733 }
734 }
735 Ok(())
736}
737
738#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
739pub async fn handle_serve<C, S>(
740 srv: S,
741 clt: C,
742 cx: C::Context,
743 timeout: Duration,
744 workload: &str,
745) -> anyhow::Result<()>
746where
747 C: Invoke + Clone + 'static,
748 C::Context: Clone + 'static,
749 S: Serve,
750{
751 let (pre, engine, guest_resources, host_resources) =
752 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
753
754 let mut handlers = JoinSet::new();
755 if guest_resources.is_empty() {
756 serve_stateless(
757 &mut handlers,
758 srv,
759 clt,
760 cx,
761 pre,
762 host_resources,
763 &engine,
764 timeout,
765 )
766 .await?;
767 } else {
768 serve_shared(
769 &mut handlers,
770 srv,
771 new_store(&engine, clt, cx, "reactor.wasm", timeout),
772 pre,
773 guest_resources,
774 host_resources,
775 )
776 .await?;
777 }
778 while let Some(res) = handlers.join_next().await {
779 if let Err(err) = res {
780 error!(?err, "handler failed");
781 }
782 }
783 Ok(())
784}
785
786#[instrument(level = "trace", ret(level = "trace"))]
787pub async fn run() -> anyhow::Result<()> {
788 wrpc_cli::tracing::init();
789 match Command::parse() {
790 Command::Nats(args) => nats::run(args).await,
791 Command::Tcp(args) => tcp::run(args).await,
792 }
793}