1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceTable, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
26use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
27use wrpc_runtime_wasmtime::{
28 collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
29 RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
30};
31use wrpc_transport::{Invoke, Serve};
32
33mod nats;
34mod tcp;
35
36const DEFAULT_TIMEOUT: &str = "10s";
37
38#[derive(Parser, Debug)]
39#[command(author, version, about, long_about = None)]
40enum Command {
41 #[command(subcommand)]
42 Nats(nats::Command),
43 #[command(subcommand)]
44 Tcp(tcp::Command),
45}
46
47pub enum Workload {
48 Url(Url),
49 Binary(Vec<u8>),
50}
51
52pub struct WrpcCtx<C: Invoke> {
53 pub wrpc: C,
54 pub cx: C::Context,
55 pub shared_resources: SharedResourceTable,
56 pub timeout: Duration,
57}
58
59pub struct Ctx<C: Invoke> {
60 pub table: ResourceTable,
61 pub wasi: WasiCtx,
62 pub http: WasiHttpCtx,
63 pub wrpc: WrpcCtx<C>,
64}
65
66impl<C> wrpc_runtime_wasmtime::WrpcCtx<C> for WrpcCtx<C>
67where
68 C: Invoke,
69 C::Context: Clone,
70{
71 fn context(&self) -> C::Context {
72 self.cx.clone()
73 }
74
75 fn client(&self) -> &C {
76 &self.wrpc
77 }
78
79 fn shared_resources(&mut self) -> &mut SharedResourceTable {
80 &mut self.shared_resources
81 }
82
83 fn timeout(&self) -> Option<Duration> {
84 Some(self.timeout)
85 }
86}
87
88impl<C> WrpcView for Ctx<C>
89where
90 C: Invoke,
91 C::Context: Clone,
92{
93 type Invoke = C;
94
95 fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
96 WrpcCtxView {
97 ctx: &mut self.wrpc,
98 table: &mut self.table,
99 }
100 }
101}
102
103impl<C: Invoke> WasiView for Ctx<C> {
104 fn ctx(&mut self) -> WasiCtxView<'_> {
105 WasiCtxView {
106 ctx: &mut self.wasi,
107 table: &mut self.table,
108 }
109 }
110}
111
112impl<C: Invoke> WasiHttpView for Ctx<C> {
113 fn ctx(&mut self) -> &mut WasiHttpCtx {
114 &mut self.http
115 }
116
117 fn table(&mut self) -> &mut ResourceTable {
118 &mut self.table
119 }
120}
121
122fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
124 const BITS_TO_TEST: u32 = 42;
125 let mut config = wasmtime::Config::new();
126 config.wasm_memory64(true);
127 config.memory_reservation(1 << BITS_TO_TEST);
128 let engine = wasmtime::Engine::new(&config)?;
129 let mut store = wasmtime::Store::new(&engine, ());
130 let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
133 if wasmtime::Memory::new(&mut store, ty).is_ok() {
134 Ok(Some(true))
135 } else {
136 Ok(None)
137 }
138}
139
140fn is_0_2(version: &str, min_patch: u64) -> bool {
141 if let Ok(semver::Version {
142 major,
143 minor,
144 patch,
145 pre,
146 build,
147 }) = version.parse()
148 {
149 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
150 } else {
151 false
152 }
153}
154
155#[instrument(level = "trace", skip(adapter))]
156async fn instantiate_pre<C>(
157 adapter: &[u8],
158 workload: &str,
159) -> anyhow::Result<(
160 InstancePre<Ctx<C>>,
161 Engine,
162 Arc<[ResourceType]>,
163 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
164)>
165where
166 C: Invoke + Clone + 'static,
167 C::Context: Clone + 'static,
168{
169 let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
170 .context("failed to construct common Wasmtime options")?;
171 let mut config = opts
172 .config(use_pooling_allocator_by_default().unwrap_or(None))
173 .context("failed to construct Wasmtime config")?;
174 config.wasm_component_model(true);
175 config.async_support(true);
176 let engine = wasmtime::Engine::new(&config).context("failed to initialize Wasmtime engine")?;
177
178 let wasm = if workload.starts_with('.') || workload.starts_with('/') {
179 fs::read(&workload)
180 .await
181 .with_context(|| format!("failed to read relative path to workload `{workload}`"))
182 .map(Workload::Binary)
183 } else {
184 Url::parse(workload)
185 .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
186 .map(Workload::Url)
187 }?;
188 let wasm = match wasm {
189 Workload::Url(wasm) => match wasm.scheme() {
190 "file" => {
191 let wasm = wasm
192 .to_file_path()
193 .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
194 fs::read(wasm)
195 .await
196 .context("failed to read Wasm from file URL")?
197 }
198 "http" | "https" => {
199 let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
200 let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
201 wasm.to_vec()
202 }
203 scheme => bail!("URL scheme `{scheme}` not supported"),
204 },
205 Workload::Binary(wasm) => wasm,
206 };
207 let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
208 wit_component::ComponentEncoder::default()
209 .validate(true)
210 .module(&wasm)
211 .context("failed to set core component module")?
212 .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
213 .context("failed to add WASI adapter")?
214 .encode()
215 .context("failed to encode a component")?
216 } else {
217 wasm
218 };
219
220 let component = Component::new(&engine, wasm).context("failed to compile component")?;
221
222 let mut linker = Linker::<Ctx<C>>::new(&engine);
223 wasmtime_wasi::p2::add_to_linker_async(&mut linker).context("failed to link WASI")?;
224 wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
225 .context("failed to link `wasi:http`")?;
226 wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
227
228 let ty = component.component_type();
229 let mut host_resources = BTreeMap::default();
230 let mut guest_resources = Vec::new();
231 collect_component_resource_imports(&engine, &ty, &mut host_resources);
232 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
233 let io_err_tys = host_resources
234 .range::<str, _>((
235 Bound::Included("wasi:io/error@0.2"),
236 Bound::Excluded("wasi:io/error@0.3"),
237 ))
238 .map(|(name, instance)| {
239 instance
240 .get("error")
241 .copied()
242 .with_context(|| format!("{name} instance import missing `error` resource"))
243 })
244 .collect::<anyhow::Result<Box<[_]>>>()?;
245 let io_pollable_tys = host_resources
246 .range::<str, _>((
247 Bound::Included("wasi:io/poll@0.2"),
248 Bound::Excluded("wasi:io/poll@0.3"),
249 ))
250 .map(|(name, instance)| {
251 instance
252 .get("pollable")
253 .copied()
254 .with_context(|| format!("{name} instance import missing `pollable` resource"))
255 })
256 .collect::<anyhow::Result<Box<[_]>>>()?;
257 let io_input_stream_tys = host_resources
258 .range::<str, _>((
259 Bound::Included("wasi:io/streams@0.2"),
260 Bound::Excluded("wasi:io/streams@0.3"),
261 ))
262 .map(|(name, instance)| {
263 instance
264 .get("input-stream")
265 .copied()
266 .with_context(|| format!("{name} instance import missing `input-stream` resource"))
267 })
268 .collect::<anyhow::Result<Box<[_]>>>()?;
269 let io_output_stream_tys = host_resources
270 .range::<str, _>((
271 Bound::Included("wasi:io/streams@0.2"),
272 Bound::Excluded("wasi:io/streams@0.3"),
273 ))
274 .map(|(name, instance)| {
275 instance
276 .get("output-stream")
277 .copied()
278 .with_context(|| format!("{name} instance import missing `output-stream` resource"))
279 })
280 .collect::<anyhow::Result<Box<[_]>>>()?;
281 let rpc_err_ty = host_resources
282 .get("wrpc:rpc/error@0.1.0")
283 .map(|instance| {
284 instance
285 .get("error")
286 .copied()
287 .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
288 })
289 .transpose()?;
290 let host_resources = host_resources
292 .into_iter()
293 .map(|(name, instance)| {
294 let instance = instance
295 .into_iter()
296 .map(|(name, ty)| {
297 let host_ty = match ty {
298 ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
299 ty if io_err_tys.contains(&ty) => {
300 ResourceType::host::<wasmtime_wasi::p2::bindings::io::error::Error>()
301 }
302 ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
303 wasmtime_wasi::p2::bindings::io::streams::InputStream,
304 >(),
305 ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
306 wasmtime_wasi::p2::bindings::io::streams::OutputStream,
307 >(),
308 ty if io_pollable_tys.contains(&ty) => {
309 ResourceType::host::<wasmtime_wasi::p2::bindings::io::poll::Pollable>()
310 }
311 _ => ResourceType::host::<RemoteResource>(),
312 };
313 (name, (ty, host_ty))
314 })
315 .collect::<HashMap<_, _>>();
316 (name, instance)
317 })
318 .collect::<HashMap<_, _>>();
319 let host_resources = Arc::from(host_resources);
320 let guest_resources = Arc::from(guest_resources);
321 for (name, item) in ty.imports(&engine) {
322 match name.split_once('/').map(|(pkg, suffix)| {
324 suffix
325 .split_once('@')
326 .map_or((pkg, suffix, None), |(iface, version)| {
327 (pkg, iface, Some(version))
328 })
329 }) {
330 Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
331 Some((
332 "wasi:cli",
333 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
334 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
335 Some(version),
336 )) if is_0_2(version, 0) => {}
337 Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
338 if is_0_2(version, 0) => {}
339 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
340 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
341 if is_0_2(version, 0) => {}
342 Some((
343 "wasi:http",
344 "incoming-handler" | "outgoing-handler" | "types",
345 Some(version),
346 )) if is_0_2(version, 0) => {}
347 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
348 if is_0_2(version, 0) => {}
349 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
350 if is_0_2(version, 0) => {}
351 Some((
352 "wasi:sockets",
353 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
354 | "udp-create-socket" | "udp",
355 Some(version),
356 )) if is_0_2(version, 0) => {}
357 _ => {
358 if let Err(err) = link_item(
359 &engine,
360 &mut linker.root(),
361 Arc::clone(&guest_resources),
362 Arc::clone(&host_resources),
363 item,
364 "",
365 name,
366 ) {
367 error!(?err, "failed to polyfill instance");
368 }
369 }
370 }
371 }
372
373 let pre = linker
374 .instantiate_pre(&component)
375 .context("failed to pre-instantiate component")?;
376 Ok((pre, engine, guest_resources, host_resources))
377}
378
379fn new_store<C: Invoke>(
380 engine: &Engine,
381 wrpc: C,
382 cx: C::Context,
383 arg0: &str,
384 timeout: Duration,
385) -> wasmtime::Store<Ctx<C>> {
386 Store::new(
387 engine,
388 Ctx {
389 wasi: WasiCtxBuilder::new()
390 .inherit_env()
391 .inherit_stdio()
392 .inherit_network()
393 .allow_ip_name_lookup(true)
394 .allow_tcp(true)
395 .allow_udp(true)
396 .args(&[arg0])
397 .build(),
398 http: WasiHttpCtx::new(),
399 table: ResourceTable::new(),
400 wrpc: WrpcCtx {
401 wrpc,
402 cx,
403 shared_resources: SharedResourceTable::default(),
404 timeout,
405 },
406 },
407 )
408}
409
410#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
411pub async fn handle_run<C>(
412 clt: C,
413 cx: C::Context,
414 timeout: Duration,
415 workload: &str,
416) -> anyhow::Result<()>
417where
418 C: Invoke + Clone + 'static,
419 C::Context: Clone + 'static,
420{
421 let (pre, engine, _, _) =
422 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
423 let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
424 let cmd = wasmtime_wasi::p2::bindings::CommandPre::new(pre)
425 .context("failed to construct `command` instance")?
426 .instantiate_async(&mut store)
427 .await
428 .context("failed to instantiate `command`")?;
429 cmd.wasi_cli_run()
430 .call_run(&mut store)
431 .await
432 .context("failed to run component")?
433 .map_err(|()| anyhow!("component failed"))
434}
435
436#[instrument(level = "trace", skip_all, ret(level = "trace"))]
437pub async fn serve_shared<C, S>(
438 handlers: &mut JoinSet<()>,
439 srv: S,
440 mut store: wasmtime::Store<Ctx<C>>,
441 pre: InstancePre<Ctx<C>>,
442 guest_resources: Arc<[ResourceType]>,
443 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
444) -> anyhow::Result<()>
445where
446 C: Invoke + 'static,
447 C::Context: Clone,
448 S: Serve,
449{
450 let span = Span::current();
451 let instance = pre
452 .instantiate_async(&mut store)
453 .await
454 .context("failed to instantiate component")?;
455 let engine = store.engine().clone();
456 let store = Arc::new(Mutex::new(store));
457 for (name, ty) in pre.component().component_type().exports(&engine) {
458 match (name, ty) {
459 (name, types::ComponentItem::ComponentFunc(ty)) => {
460 info!(?name, "serving root function");
461 let invocations = srv
462 .serve_function_shared(
463 Arc::clone(&store),
464 instance,
465 Arc::clone(&guest_resources),
466 Arc::clone(&host_resources),
467 ty,
468 "",
469 name,
470 )
471 .await?;
472 handlers.spawn(
473 async move {
474 let mut invocations = pin!(invocations);
475 while let Some(invocation) = invocations.next().await {
476 match invocation {
477 Ok((_, fut)) => {
478 info!("serving root function invocation");
479 if let Err(err) = fut.await {
480 warn!(?err, "failed to serve root function invocation");
481 } else {
482 info!("successfully served root function invocation");
483 }
484 }
485 Err(err) => {
486 error!(?err, "failed to accept root function invocation");
487 }
488 }
489 }
490 }
491 .instrument(span.clone()),
492 );
493 }
494 (_, types::ComponentItem::CoreFunc(_)) => {
495 warn!(name, "serving root core function exports not supported yet");
496 }
497 (_, types::ComponentItem::Module(_)) => {
498 warn!(name, "serving root module exports not supported yet");
499 }
500 (_, types::ComponentItem::Component(_)) => {
501 warn!(name, "serving root component exports not supported yet");
502 }
503 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
504 for (name, ty) in ty.exports(&engine) {
505 match ty {
506 types::ComponentItem::ComponentFunc(ty) => {
507 info!(?name, "serving instance function");
508 let invocations = srv
509 .serve_function_shared(
510 Arc::clone(&store),
511 instance,
512 Arc::clone(&guest_resources),
513 Arc::clone(&host_resources),
514 ty,
515 instance_name,
516 name,
517 )
518 .await?;
519 handlers.spawn(async move {
520 let mut invocations = pin!(invocations);
521 while let Some(invocation) = invocations.next().await {
522 match invocation {
523 Ok((_, fut)) => {
524 info!("serving instance function invocation");
525 if let Err(err) = fut.await {
526 warn!(
527 ?err,
528 "failed to serve instance function invocation"
529 );
530 } else {
531 info!(
532 "successfully served instance function invocation"
533 );
534 }
535 }
536 Err(err) => {
537 error!(
538 ?err,
539 "failed to accept instance function invocation"
540 );
541 }
542 }
543 }
544 }
545 .instrument(span.clone()));
546 }
547 types::ComponentItem::CoreFunc(_) => {
548 warn!(
549 instance_name,
550 name, "serving instance core function exports not supported yet"
551 );
552 }
553 types::ComponentItem::Module(_) => {
554 warn!(
555 instance_name,
556 name, "serving instance module exports not supported yet"
557 );
558 }
559 types::ComponentItem::Component(_) => {
560 warn!(
561 instance_name,
562 name, "serving instance component exports not supported yet"
563 );
564 }
565 types::ComponentItem::ComponentInstance(_) => {
566 warn!(
567 instance_name,
568 name, "serving nested instance exports not supported yet"
569 );
570 }
571 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
572 }
573 }
574 }
575 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
576 }
577 }
578 Ok(())
579}
580
581#[instrument(level = "trace", skip_all, ret(level = "trace"))]
582#[allow(clippy::too_many_arguments)]
583pub async fn serve_stateless<C, S>(
584 handlers: &mut JoinSet<()>,
585 srv: S,
586 clt: C,
587 cx: C::Context,
588 pre: InstancePre<Ctx<C>>,
589 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
590 engine: &Engine,
591 timeout: Duration,
592) -> anyhow::Result<()>
593where
594 C: Invoke + Clone + 'static,
595 C::Context: Clone + 'static,
596 S: Serve,
597{
598 let span = Span::current();
599 for (name, ty) in pre.component().component_type().exports(engine) {
600 match (name, ty) {
601 (name, types::ComponentItem::ComponentFunc(ty)) => {
602 let clt = clt.clone();
603 let cx = cx.clone();
604 let engine = engine.clone();
605 info!(?name, "serving root function");
606 let invocations = srv
607 .serve_function(
608 move || {
609 new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
610 },
611 pre.clone(),
612 Arc::clone(&host_resources),
613 ty,
614 "",
615 name,
616 )
617 .await?;
618 handlers.spawn(
619 async move {
620 let mut invocations = pin!(invocations);
621 while let Some(invocation) = invocations.next().await {
622 match invocation {
623 Ok((_, fut)) => {
624 info!("serving root function invocation");
625 if let Err(err) = fut.await {
626 warn!(?err, "failed to serve root function invocation");
627 } else {
628 info!("successfully served root function invocation");
629 }
630 }
631 Err(err) => {
632 error!(?err, "failed to accept root function invocation");
633 }
634 }
635 }
636 }
637 .instrument(span.clone()),
638 );
639 }
640 (_, types::ComponentItem::CoreFunc(_)) => {
641 warn!(name, "serving root core function exports not supported yet");
642 }
643 (_, types::ComponentItem::Module(_)) => {
644 warn!(name, "serving root module exports not supported yet");
645 }
646 (_, types::ComponentItem::Component(_)) => {
647 warn!(name, "serving root component exports not supported yet");
648 }
649 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
650 for (name, ty) in ty.exports(engine) {
651 match ty {
652 types::ComponentItem::ComponentFunc(ty) => {
653 let clt = clt.clone();
654 let engine = engine.clone();
655 let cx = cx.clone();
656 info!(?name, "serving instance function");
657 let invocations = srv
658 .serve_function(
659 move || {
660 new_store(
661 &engine,
662 clt.clone(),
663 cx.clone(),
664 "reactor.wasm",
665 timeout,
666 )
667 },
668 pre.clone(),
669 Arc::clone(&host_resources),
670 ty,
671 instance_name,
672 name,
673 )
674 .await?;
675 handlers.spawn(async move {
676 let mut invocations = pin!(invocations);
677 while let Some(invocation) = invocations.next().await {
678 match invocation {
679 Ok((_, fut)) => {
680 info!("serving instance function invocation");
681 if let Err(err) = fut.await {
682 warn!(
683 ?err,
684 "failed to serve instance function invocation"
685 );
686 } else {
687 info!(
688 "successfully served instance function invocation"
689 );
690 }
691 }
692 Err(err) => {
693 error!(
694 ?err,
695 "failed to accept instance function invocation"
696 );
697 }
698 }
699 }
700 }.instrument(span.clone()));
701 }
702 types::ComponentItem::CoreFunc(_) => {
703 warn!(
704 instance_name,
705 name, "serving instance core function exports not supported yet"
706 );
707 }
708 types::ComponentItem::Module(_) => {
709 warn!(
710 instance_name,
711 name, "serving instance module exports not supported yet"
712 );
713 }
714 types::ComponentItem::Component(_) => {
715 warn!(
716 instance_name,
717 name, "serving instance component exports not supported yet"
718 );
719 }
720 types::ComponentItem::ComponentInstance(_) => {
721 warn!(
722 instance_name,
723 name, "serving nested instance exports not supported yet"
724 );
725 }
726 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
727 }
728 }
729 }
730 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
731 }
732 }
733 Ok(())
734}
735
736#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
737pub async fn handle_serve<C, S>(
738 srv: S,
739 clt: C,
740 cx: C::Context,
741 timeout: Duration,
742 workload: &str,
743) -> anyhow::Result<()>
744where
745 C: Invoke + Clone + 'static,
746 C::Context: Clone + 'static,
747 S: Serve,
748{
749 let (pre, engine, guest_resources, host_resources) =
750 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
751
752 let mut handlers = JoinSet::new();
753 if guest_resources.is_empty() {
754 serve_stateless(
755 &mut handlers,
756 srv,
757 clt,
758 cx,
759 pre,
760 host_resources,
761 &engine,
762 timeout,
763 )
764 .await?;
765 } else {
766 serve_shared(
767 &mut handlers,
768 srv,
769 new_store(&engine, clt, cx, "reactor.wasm", timeout),
770 pre,
771 guest_resources,
772 host_resources,
773 )
774 .await?;
775 }
776 while let Some(res) = handlers.join_next().await {
777 if let Err(err) = res {
778 error!(?err, "handler failed");
779 }
780 }
781 Ok(())
782}
783
784#[instrument(level = "trace", ret(level = "trace"))]
785pub async fn run() -> anyhow::Result<()> {
786 wrpc_cli::tracing::init();
787 match Command::parse() {
788 Command::Nats(args) => nats::run(args).await,
789 Command::Tcp(args) => tcp::run(args).await,
790 }
791}