1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{IoView, ResourceTable, WasiCtx, WasiCtxBuilder, WasiView};
26use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
27use wrpc_runtime_wasmtime::{
28 collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
29 RemoteResource, ServeExt as _, SharedResourceTable, WrpcView,
30};
31use wrpc_transport::{Invoke, Serve};
32
33mod nats;
34mod tcp;
35
36const DEFAULT_TIMEOUT: &str = "10s";
37
38#[derive(Parser, Debug)]
39#[command(author, version, about, long_about = None)]
40enum Command {
41 #[command(subcommand)]
42 Nats(nats::Command),
43 #[command(subcommand)]
44 Tcp(tcp::Command),
45}
46
47pub enum Workload {
48 Url(Url),
49 Binary(Vec<u8>),
50}
51
52pub struct Ctx<C: Invoke> {
53 pub table: ResourceTable,
54 pub wasi: WasiCtx,
55 pub http: WasiHttpCtx,
56 pub wrpc: C,
57 pub cx: C::Context,
58 pub shared_resources: SharedResourceTable,
59 pub timeout: Duration,
60}
61
62impl<C> WrpcView for Ctx<C>
63where
64 C: Invoke,
65 C::Context: Clone,
66{
67 type Invoke = C;
68
69 fn context(&self) -> C::Context {
70 self.cx.clone()
71 }
72
73 fn client(&self) -> &Self::Invoke {
74 &self.wrpc
75 }
76
77 fn shared_resources(&mut self) -> &mut SharedResourceTable {
78 &mut self.shared_resources
79 }
80
81 fn timeout(&self) -> Option<Duration> {
82 Some(self.timeout)
83 }
84}
85
86impl<C: Invoke> IoView for Ctx<C> {
87 fn table(&mut self) -> &mut ResourceTable {
88 &mut self.table
89 }
90}
91
92impl<C: Invoke> WasiView for Ctx<C> {
93 fn ctx(&mut self) -> &mut WasiCtx {
94 &mut self.wasi
95 }
96}
97
98impl<C: Invoke> WasiHttpView for Ctx<C> {
99 fn ctx(&mut self) -> &mut WasiHttpCtx {
100 &mut self.http
101 }
102}
103
104fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
106 const BITS_TO_TEST: u32 = 42;
107 let mut config = wasmtime::Config::new();
108 config.wasm_memory64(true);
109 config.memory_reservation(1 << BITS_TO_TEST);
110 let engine = wasmtime::Engine::new(&config)?;
111 let mut store = wasmtime::Store::new(&engine, ());
112 let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
115 if wasmtime::Memory::new(&mut store, ty).is_ok() {
116 Ok(Some(true))
117 } else {
118 Ok(None)
119 }
120}
121
122fn is_0_2(version: &str, min_patch: u64) -> bool {
123 if let Ok(semver::Version {
124 major,
125 minor,
126 patch,
127 pre,
128 build,
129 }) = version.parse()
130 {
131 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
132 } else {
133 false
134 }
135}
136
137#[instrument(level = "trace", skip(adapter))]
138async fn instantiate_pre<C>(
139 adapter: &[u8],
140 workload: &str,
141) -> anyhow::Result<(
142 InstancePre<Ctx<C>>,
143 Engine,
144 Arc<[ResourceType]>,
145 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
146)>
147where
148 C: Invoke + Clone + 'static,
149 C::Context: Clone + 'static,
150{
151 let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
152 .context("failed to construct common Wasmtime options")?;
153 let mut config = opts
154 .config(use_pooling_allocator_by_default().unwrap_or(None))
155 .context("failed to construct Wasmtime config")?;
156 config.wasm_component_model(true);
157 config.async_support(true);
158 let engine = wasmtime::Engine::new(&config).context("failed to initialize Wasmtime engine")?;
159
160 let wasm = if workload.starts_with('.') || workload.starts_with('/') {
161 fs::read(&workload)
162 .await
163 .with_context(|| format!("failed to read relative path to workload `{workload}`"))
164 .map(Workload::Binary)
165 } else {
166 Url::parse(workload)
167 .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
168 .map(Workload::Url)
169 }?;
170 let wasm = match wasm {
171 Workload::Url(wasm) => match wasm.scheme() {
172 "file" => {
173 let wasm = wasm
174 .to_file_path()
175 .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
176 fs::read(wasm)
177 .await
178 .context("failed to read Wasm from file URL")?
179 }
180 "http" | "https" => {
181 let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
182 let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
183 wasm.to_vec()
184 }
185 scheme => bail!("URL scheme `{scheme}` not supported"),
186 },
187 Workload::Binary(wasm) => wasm,
188 };
189 let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
190 wit_component::ComponentEncoder::default()
191 .validate(true)
192 .module(&wasm)
193 .context("failed to set core component module")?
194 .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
195 .context("failed to add WASI adapter")?
196 .encode()
197 .context("failed to encode a component")?
198 } else {
199 wasm
200 };
201
202 let component = Component::new(&engine, wasm).context("failed to compile component")?;
203
204 let mut linker = Linker::<Ctx<C>>::new(&engine);
205 wasmtime_wasi::add_to_linker_async(&mut linker).context("failed to link WASI")?;
206 wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
207 .context("failed to link `wasi:http`")?;
208 wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
209
210 let ty = component.component_type();
211 let mut host_resources = BTreeMap::default();
212 let mut guest_resources = Vec::new();
213 collect_component_resource_imports(&engine, &ty, &mut host_resources);
214 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
215 let io_err_tys = host_resources
216 .range::<str, _>((
217 Bound::Included("wasi:io/error@0.2"),
218 Bound::Excluded("wasi:io/error@0.3"),
219 ))
220 .map(|(name, instance)| {
221 instance
222 .get("error")
223 .copied()
224 .with_context(|| format!("{name} instance import missing `error` resource"))
225 })
226 .collect::<anyhow::Result<Box<[_]>>>()?;
227 let io_pollable_tys = host_resources
228 .range::<str, _>((
229 Bound::Included("wasi:io/poll@0.2"),
230 Bound::Excluded("wasi:io/poll@0.3"),
231 ))
232 .map(|(name, instance)| {
233 instance
234 .get("pollable")
235 .copied()
236 .with_context(|| format!("{name} instance import missing `pollable` resource"))
237 })
238 .collect::<anyhow::Result<Box<[_]>>>()?;
239 let io_input_stream_tys = host_resources
240 .range::<str, _>((
241 Bound::Included("wasi:io/streams@0.2"),
242 Bound::Excluded("wasi:io/streams@0.3"),
243 ))
244 .map(|(name, instance)| {
245 instance
246 .get("input-stream")
247 .copied()
248 .with_context(|| format!("{name} instance import missing `input-stream` resource"))
249 })
250 .collect::<anyhow::Result<Box<[_]>>>()?;
251 let io_output_stream_tys = host_resources
252 .range::<str, _>((
253 Bound::Included("wasi:io/streams@0.2"),
254 Bound::Excluded("wasi:io/streams@0.3"),
255 ))
256 .map(|(name, instance)| {
257 instance
258 .get("output-stream")
259 .copied()
260 .with_context(|| format!("{name} instance import missing `output-stream` resource"))
261 })
262 .collect::<anyhow::Result<Box<[_]>>>()?;
263 let rpc_err_ty = host_resources
264 .get("wrpc:rpc/error@0.1.0")
265 .map(|instance| {
266 instance
267 .get("error")
268 .copied()
269 .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
270 })
271 .transpose()?;
272 let host_resources = host_resources
274 .into_iter()
275 .map(|(name, instance)| {
276 let instance = instance
277 .into_iter()
278 .map(|(name, ty)| {
279 let host_ty = match ty {
280 ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
281 ty if io_err_tys.contains(&ty) => {
282 ResourceType::host::<wasmtime_wasi::bindings::io::error::Error>()
283 }
284 ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
285 wasmtime_wasi::bindings::io::streams::InputStream,
286 >(),
287 ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
288 wasmtime_wasi::bindings::io::streams::OutputStream,
289 >(),
290 ty if io_pollable_tys.contains(&ty) => {
291 ResourceType::host::<wasmtime_wasi::bindings::io::poll::Pollable>()
292 }
293 _ => ResourceType::host::<RemoteResource>(),
294 };
295 (name, (ty, host_ty))
296 })
297 .collect::<HashMap<_, _>>();
298 (name, instance)
299 })
300 .collect::<HashMap<_, _>>();
301 let host_resources = Arc::from(host_resources);
302 let guest_resources = Arc::from(guest_resources);
303 for (name, item) in ty.imports(&engine) {
304 match name.split_once('/').map(|(pkg, suffix)| {
306 suffix
307 .split_once('@')
308 .map_or((pkg, suffix, None), |(iface, version)| {
309 (pkg, iface, Some(version))
310 })
311 }) {
312 Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
313 Some((
314 "wasi:cli",
315 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
316 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
317 Some(version),
318 )) if is_0_2(version, 0) => {}
319 Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
320 if is_0_2(version, 0) => {}
321 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
322 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
323 if is_0_2(version, 0) => {}
324 Some((
325 "wasi:http",
326 "incoming-handler" | "outgoing-handler" | "types",
327 Some(version),
328 )) if is_0_2(version, 0) => {}
329 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
330 if is_0_2(version, 0) => {}
331 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
332 if is_0_2(version, 0) => {}
333 Some((
334 "wasi:sockets",
335 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
336 | "udp-create-socket" | "udp",
337 Some(version),
338 )) if is_0_2(version, 0) => {}
339 _ => {
340 if let Err(err) = link_item(
341 &engine,
342 &mut linker.root(),
343 Arc::clone(&guest_resources),
344 Arc::clone(&host_resources),
345 item,
346 "",
347 name,
348 ) {
349 error!(?err, "failed to polyfill instance");
350 }
351 }
352 }
353 }
354
355 let pre = linker
356 .instantiate_pre(&component)
357 .context("failed to pre-instantiate component")?;
358 Ok((pre, engine, guest_resources, host_resources))
359}
360
361fn new_store<C: Invoke>(
362 engine: &Engine,
363 wrpc: C,
364 cx: C::Context,
365 arg0: &str,
366 timeout: Duration,
367) -> wasmtime::Store<Ctx<C>> {
368 Store::new(
369 engine,
370 Ctx {
371 wasi: WasiCtxBuilder::new()
372 .inherit_env()
373 .inherit_stdio()
374 .inherit_network()
375 .allow_ip_name_lookup(true)
376 .allow_tcp(true)
377 .allow_udp(true)
378 .args(&[arg0])
379 .build(),
380 http: WasiHttpCtx::new(),
381 table: ResourceTable::new(),
382 shared_resources: SharedResourceTable::default(),
383 wrpc,
384 cx,
385 timeout,
386 },
387 )
388}
389
390#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
391pub async fn handle_run<C>(
392 clt: C,
393 cx: C::Context,
394 timeout: Duration,
395 workload: &str,
396) -> anyhow::Result<()>
397where
398 C: Invoke + Clone + 'static,
399 C::Context: Clone + 'static,
400{
401 let (pre, engine, _, _) =
402 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
403 let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
404 let cmd = wasmtime_wasi::bindings::CommandPre::new(pre)
405 .context("failed to construct `command` instance")?
406 .instantiate_async(&mut store)
407 .await
408 .context("failed to instantiate `command`")?;
409 cmd.wasi_cli_run()
410 .call_run(&mut store)
411 .await
412 .context("failed to run component")?
413 .map_err(|()| anyhow!("component failed"))
414}
415
416#[instrument(level = "trace", skip_all, ret(level = "trace"))]
417pub async fn serve_shared<C, S>(
418 handlers: &mut JoinSet<()>,
419 srv: S,
420 mut store: wasmtime::Store<Ctx<C>>,
421 pre: InstancePre<Ctx<C>>,
422 guest_resources: Arc<[ResourceType]>,
423 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
424) -> anyhow::Result<()>
425where
426 C: Invoke + 'static,
427 C::Context: Clone,
428 S: Serve,
429{
430 let span = Span::current();
431 let instance = pre
432 .instantiate_async(&mut store)
433 .await
434 .context("failed to instantiate component")?;
435 let engine = store.engine().clone();
436 let store = Arc::new(Mutex::new(store));
437 for (name, ty) in pre.component().component_type().exports(&engine) {
438 match (name, ty) {
439 (name, types::ComponentItem::ComponentFunc(ty)) => {
440 info!(?name, "serving root function");
441 let invocations = srv
442 .serve_function_shared(
443 Arc::clone(&store),
444 instance,
445 Arc::clone(&guest_resources),
446 Arc::clone(&host_resources),
447 ty,
448 "",
449 name,
450 )
451 .await?;
452 handlers.spawn(
453 async move {
454 let mut invocations = pin!(invocations);
455 while let Some(invocation) = invocations.next().await {
456 match invocation {
457 Ok((_, fut)) => {
458 info!("serving root function invocation");
459 if let Err(err) = fut.await {
460 warn!(?err, "failed to serve root function invocation");
461 } else {
462 info!("successfully served root function invocation");
463 }
464 }
465 Err(err) => {
466 error!(?err, "failed to accept root function invocation");
467 }
468 }
469 }
470 }
471 .instrument(span.clone()),
472 );
473 }
474 (_, types::ComponentItem::CoreFunc(_)) => {
475 warn!(name, "serving root core function exports not supported yet");
476 }
477 (_, types::ComponentItem::Module(_)) => {
478 warn!(name, "serving root module exports not supported yet");
479 }
480 (_, types::ComponentItem::Component(_)) => {
481 warn!(name, "serving root component exports not supported yet");
482 }
483 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
484 for (name, ty) in ty.exports(&engine) {
485 match ty {
486 types::ComponentItem::ComponentFunc(ty) => {
487 info!(?name, "serving instance function");
488 let invocations = srv
489 .serve_function_shared(
490 Arc::clone(&store),
491 instance,
492 Arc::clone(&guest_resources),
493 Arc::clone(&host_resources),
494 ty,
495 instance_name,
496 name,
497 )
498 .await?;
499 handlers.spawn(async move {
500 let mut invocations = pin!(invocations);
501 while let Some(invocation) = invocations.next().await {
502 match invocation {
503 Ok((_, fut)) => {
504 info!("serving instance function invocation");
505 if let Err(err) = fut.await {
506 warn!(
507 ?err,
508 "failed to serve instance function invocation"
509 );
510 } else {
511 info!(
512 "successfully served instance function invocation"
513 );
514 }
515 }
516 Err(err) => {
517 error!(
518 ?err,
519 "failed to accept instance function invocation"
520 );
521 }
522 }
523 }
524 }
525 .instrument(span.clone()));
526 }
527 types::ComponentItem::CoreFunc(_) => {
528 warn!(
529 instance_name,
530 name, "serving instance core function exports not supported yet"
531 );
532 }
533 types::ComponentItem::Module(_) => {
534 warn!(
535 instance_name,
536 name, "serving instance module exports not supported yet"
537 );
538 }
539 types::ComponentItem::Component(_) => {
540 warn!(
541 instance_name,
542 name, "serving instance component exports not supported yet"
543 );
544 }
545 types::ComponentItem::ComponentInstance(_) => {
546 warn!(
547 instance_name,
548 name, "serving nested instance exports not supported yet"
549 );
550 }
551 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
552 }
553 }
554 }
555 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
556 }
557 }
558 Ok(())
559}
560
561#[instrument(level = "trace", skip_all, ret(level = "trace"))]
562#[allow(clippy::too_many_arguments)]
563pub async fn serve_stateless<C, S>(
564 handlers: &mut JoinSet<()>,
565 srv: S,
566 clt: C,
567 cx: C::Context,
568 pre: InstancePre<Ctx<C>>,
569 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
570 engine: &Engine,
571 timeout: Duration,
572) -> anyhow::Result<()>
573where
574 C: Invoke + Clone + 'static,
575 C::Context: Clone + 'static,
576 S: Serve,
577{
578 let span = Span::current();
579 for (name, ty) in pre.component().component_type().exports(engine) {
580 match (name, ty) {
581 (name, types::ComponentItem::ComponentFunc(ty)) => {
582 let clt = clt.clone();
583 let cx = cx.clone();
584 let engine = engine.clone();
585 info!(?name, "serving root function");
586 let invocations = srv
587 .serve_function(
588 move || {
589 new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
590 },
591 pre.clone(),
592 Arc::clone(&host_resources),
593 ty,
594 "",
595 name,
596 )
597 .await?;
598 handlers.spawn(
599 async move {
600 let mut invocations = pin!(invocations);
601 while let Some(invocation) = invocations.next().await {
602 match invocation {
603 Ok((_, fut)) => {
604 info!("serving root function invocation");
605 if let Err(err) = fut.await {
606 warn!(?err, "failed to serve root function invocation");
607 } else {
608 info!("successfully served root function invocation");
609 }
610 }
611 Err(err) => {
612 error!(?err, "failed to accept root function invocation");
613 }
614 }
615 }
616 }
617 .instrument(span.clone()),
618 );
619 }
620 (_, types::ComponentItem::CoreFunc(_)) => {
621 warn!(name, "serving root core function exports not supported yet");
622 }
623 (_, types::ComponentItem::Module(_)) => {
624 warn!(name, "serving root module exports not supported yet");
625 }
626 (_, types::ComponentItem::Component(_)) => {
627 warn!(name, "serving root component exports not supported yet");
628 }
629 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
630 for (name, ty) in ty.exports(engine) {
631 match ty {
632 types::ComponentItem::ComponentFunc(ty) => {
633 let clt = clt.clone();
634 let engine = engine.clone();
635 let cx = cx.clone();
636 info!(?name, "serving instance function");
637 let invocations = srv
638 .serve_function(
639 move || {
640 new_store(
641 &engine,
642 clt.clone(),
643 cx.clone(),
644 "reactor.wasm",
645 timeout,
646 )
647 },
648 pre.clone(),
649 Arc::clone(&host_resources),
650 ty,
651 instance_name,
652 name,
653 )
654 .await?;
655 handlers.spawn(async move {
656 let mut invocations = pin!(invocations);
657 while let Some(invocation) = invocations.next().await {
658 match invocation {
659 Ok((_, fut)) => {
660 info!("serving instance function invocation");
661 if let Err(err) = fut.await {
662 warn!(
663 ?err,
664 "failed to serve instance function invocation"
665 );
666 } else {
667 info!(
668 "successfully served instance function invocation"
669 );
670 }
671 }
672 Err(err) => {
673 error!(
674 ?err,
675 "failed to accept instance function invocation"
676 );
677 }
678 }
679 }
680 }.instrument(span.clone()));
681 }
682 types::ComponentItem::CoreFunc(_) => {
683 warn!(
684 instance_name,
685 name, "serving instance core function exports not supported yet"
686 );
687 }
688 types::ComponentItem::Module(_) => {
689 warn!(
690 instance_name,
691 name, "serving instance module exports not supported yet"
692 );
693 }
694 types::ComponentItem::Component(_) => {
695 warn!(
696 instance_name,
697 name, "serving instance component exports not supported yet"
698 );
699 }
700 types::ComponentItem::ComponentInstance(_) => {
701 warn!(
702 instance_name,
703 name, "serving nested instance exports not supported yet"
704 );
705 }
706 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
707 }
708 }
709 }
710 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
711 }
712 }
713 Ok(())
714}
715
716#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
717pub async fn handle_serve<C, S>(
718 srv: S,
719 clt: C,
720 cx: C::Context,
721 timeout: Duration,
722 workload: &str,
723) -> anyhow::Result<()>
724where
725 C: Invoke + Clone + 'static,
726 C::Context: Clone + 'static,
727 S: Serve,
728{
729 let (pre, engine, guest_resources, host_resources) =
730 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
731
732 let mut handlers = JoinSet::new();
733 if guest_resources.is_empty() {
734 serve_stateless(
735 &mut handlers,
736 srv,
737 clt,
738 cx,
739 pre,
740 host_resources,
741 &engine,
742 timeout,
743 )
744 .await?;
745 } else {
746 serve_shared(
747 &mut handlers,
748 srv,
749 new_store(&engine, clt, cx, "reactor.wasm", timeout),
750 pre,
751 guest_resources,
752 host_resources,
753 )
754 .await?;
755 }
756 while let Some(res) = handlers.join_next().await {
757 if let Err(err) = res {
758 error!(?err, "handler failed");
759 }
760 }
761 Ok(())
762}
763
764#[instrument(level = "trace", ret(level = "trace"))]
765pub async fn run() -> anyhow::Result<()> {
766 wrpc_cli::tracing::init();
767 match Command::parse() {
768 Command::Nats(args) => nats::run(args).await,
769 Command::Tcp(args) => tcp::run(args).await,
770 }
771}