wrpc_wasmtime_cli/
lib.rs

1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{IoView, ResourceTable, WasiCtx, WasiCtxBuilder, WasiView};
26use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
27use wrpc_runtime_wasmtime::{
28    collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
29    RemoteResource, ServeExt as _, SharedResourceTable, WrpcView,
30};
31use wrpc_transport::{Invoke, Serve};
32
33mod nats;
34mod tcp;
35
36const DEFAULT_TIMEOUT: &str = "10s";
37
38#[derive(Parser, Debug)]
39#[command(author, version, about, long_about = None)]
40enum Command {
41    #[command(subcommand)]
42    Nats(nats::Command),
43    #[command(subcommand)]
44    Tcp(tcp::Command),
45}
46
47pub enum Workload {
48    Url(Url),
49    Binary(Vec<u8>),
50}
51
52pub struct Ctx<C: Invoke> {
53    pub table: ResourceTable,
54    pub wasi: WasiCtx,
55    pub http: WasiHttpCtx,
56    pub wrpc: C,
57    pub cx: C::Context,
58    pub shared_resources: SharedResourceTable,
59    pub timeout: Duration,
60}
61
62impl<C> WrpcView for Ctx<C>
63where
64    C: Invoke,
65    C::Context: Clone,
66{
67    type Invoke = C;
68
69    fn context(&self) -> C::Context {
70        self.cx.clone()
71    }
72
73    fn client(&self) -> &Self::Invoke {
74        &self.wrpc
75    }
76
77    fn shared_resources(&mut self) -> &mut SharedResourceTable {
78        &mut self.shared_resources
79    }
80
81    fn timeout(&self) -> Option<Duration> {
82        Some(self.timeout)
83    }
84}
85
86impl<C: Invoke> IoView for Ctx<C> {
87    fn table(&mut self) -> &mut ResourceTable {
88        &mut self.table
89    }
90}
91
92impl<C: Invoke> WasiView for Ctx<C> {
93    fn ctx(&mut self) -> &mut WasiCtx {
94        &mut self.wasi
95    }
96}
97
98impl<C: Invoke> WasiHttpView for Ctx<C> {
99    fn ctx(&mut self) -> &mut WasiHttpCtx {
100        &mut self.http
101    }
102}
103
104// https://github.com/bytecodealliance/wasmtime/blob/b943666650696f1eb7ff8b217762b58d5ef5779d/src/commands/serve.rs#L641-L656
105fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
106    const BITS_TO_TEST: u32 = 42;
107    let mut config = wasmtime::Config::new();
108    config.wasm_memory64(true);
109    config.memory_reservation(1 << BITS_TO_TEST);
110    let engine = wasmtime::Engine::new(&config)?;
111    let mut store = wasmtime::Store::new(&engine, ());
112    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
113    // page size here from the maximum size.
114    let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
115    if wasmtime::Memory::new(&mut store, ty).is_ok() {
116        Ok(Some(true))
117    } else {
118        Ok(None)
119    }
120}
121
122fn is_0_2(version: &str, min_patch: u64) -> bool {
123    if let Ok(semver::Version {
124        major,
125        minor,
126        patch,
127        pre,
128        build,
129    }) = version.parse()
130    {
131        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
132    } else {
133        false
134    }
135}
136
137#[instrument(level = "trace", skip(adapter))]
138async fn instantiate_pre<C>(
139    adapter: &[u8],
140    workload: &str,
141) -> anyhow::Result<(
142    InstancePre<Ctx<C>>,
143    Engine,
144    Arc<[ResourceType]>,
145    Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
146)>
147where
148    C: Invoke + Clone + 'static,
149    C::Context: Clone + 'static,
150{
151    let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
152        .context("failed to construct common Wasmtime options")?;
153    let mut config = opts
154        .config(use_pooling_allocator_by_default().unwrap_or(None))
155        .context("failed to construct Wasmtime config")?;
156    config.wasm_component_model(true);
157    config.async_support(true);
158    let engine = wasmtime::Engine::new(&config).context("failed to initialize Wasmtime engine")?;
159
160    let wasm = if workload.starts_with('.') || workload.starts_with('/') {
161        fs::read(&workload)
162            .await
163            .with_context(|| format!("failed to read relative path to workload `{workload}`"))
164            .map(Workload::Binary)
165    } else {
166        Url::parse(workload)
167            .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
168            .map(Workload::Url)
169    }?;
170    let wasm = match wasm {
171        Workload::Url(wasm) => match wasm.scheme() {
172            "file" => {
173                let wasm = wasm
174                    .to_file_path()
175                    .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
176                fs::read(wasm)
177                    .await
178                    .context("failed to read Wasm from file URL")?
179            }
180            "http" | "https" => {
181                let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
182                let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
183                wasm.to_vec()
184            }
185            scheme => bail!("URL scheme `{scheme}` not supported"),
186        },
187        Workload::Binary(wasm) => wasm,
188    };
189    let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
190        wit_component::ComponentEncoder::default()
191            .validate(true)
192            .module(&wasm)
193            .context("failed to set core component module")?
194            .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
195            .context("failed to add WASI adapter")?
196            .encode()
197            .context("failed to encode a component")?
198    } else {
199        wasm
200    };
201
202    let component = Component::new(&engine, wasm).context("failed to compile component")?;
203
204    let mut linker = Linker::<Ctx<C>>::new(&engine);
205    wasmtime_wasi::add_to_linker_async(&mut linker).context("failed to link WASI")?;
206    wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
207        .context("failed to link `wasi:http`")?;
208    wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
209
210    let ty = component.component_type();
211    let mut host_resources = BTreeMap::default();
212    let mut guest_resources = Vec::new();
213    collect_component_resource_imports(&engine, &ty, &mut host_resources);
214    collect_component_resource_exports(&engine, &ty, &mut guest_resources);
215    let io_err_tys = host_resources
216        .range::<str, _>((
217            Bound::Included("wasi:io/error@0.2"),
218            Bound::Excluded("wasi:io/error@0.3"),
219        ))
220        .map(|(name, instance)| {
221            instance
222                .get("error")
223                .copied()
224                .with_context(|| format!("{name} instance import missing `error` resource"))
225        })
226        .collect::<anyhow::Result<Box<[_]>>>()?;
227    let io_pollable_tys = host_resources
228        .range::<str, _>((
229            Bound::Included("wasi:io/poll@0.2"),
230            Bound::Excluded("wasi:io/poll@0.3"),
231        ))
232        .map(|(name, instance)| {
233            instance
234                .get("pollable")
235                .copied()
236                .with_context(|| format!("{name} instance import missing `pollable` resource"))
237        })
238        .collect::<anyhow::Result<Box<[_]>>>()?;
239    let io_input_stream_tys = host_resources
240        .range::<str, _>((
241            Bound::Included("wasi:io/streams@0.2"),
242            Bound::Excluded("wasi:io/streams@0.3"),
243        ))
244        .map(|(name, instance)| {
245            instance
246                .get("input-stream")
247                .copied()
248                .with_context(|| format!("{name} instance import missing `input-stream` resource"))
249        })
250        .collect::<anyhow::Result<Box<[_]>>>()?;
251    let io_output_stream_tys = host_resources
252        .range::<str, _>((
253            Bound::Included("wasi:io/streams@0.2"),
254            Bound::Excluded("wasi:io/streams@0.3"),
255        ))
256        .map(|(name, instance)| {
257            instance
258                .get("output-stream")
259                .copied()
260                .with_context(|| format!("{name} instance import missing `output-stream` resource"))
261        })
262        .collect::<anyhow::Result<Box<[_]>>>()?;
263    let rpc_err_ty = host_resources
264        .get("wrpc:rpc/error@0.1.0")
265        .map(|instance| {
266            instance
267                .get("error")
268                .copied()
269                .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
270        })
271        .transpose()?;
272    // TODO: This should include `wasi:http` resources
273    let host_resources = host_resources
274        .into_iter()
275        .map(|(name, instance)| {
276            let instance = instance
277                .into_iter()
278                .map(|(name, ty)| {
279                    let host_ty = match ty {
280                        ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
281                        ty if io_err_tys.contains(&ty) => {
282                            ResourceType::host::<wasmtime_wasi::bindings::io::error::Error>()
283                        }
284                        ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
285                            wasmtime_wasi::bindings::io::streams::InputStream,
286                        >(),
287                        ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
288                            wasmtime_wasi::bindings::io::streams::OutputStream,
289                        >(),
290                        ty if io_pollable_tys.contains(&ty) => {
291                            ResourceType::host::<wasmtime_wasi::bindings::io::poll::Pollable>()
292                        }
293                        _ => ResourceType::host::<RemoteResource>(),
294                    };
295                    (name, (ty, host_ty))
296                })
297                .collect::<HashMap<_, _>>();
298            (name, instance)
299        })
300        .collect::<HashMap<_, _>>();
301    let host_resources = Arc::from(host_resources);
302    let guest_resources = Arc::from(guest_resources);
303    for (name, item) in ty.imports(&engine) {
304        // Avoid polyfilling instances, for which static bindings are linked
305        match name.split_once('/').map(|(pkg, suffix)| {
306            suffix
307                .split_once('@')
308                .map_or((pkg, suffix, None), |(iface, version)| {
309                    (pkg, iface, Some(version))
310                })
311        }) {
312            Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
313            Some((
314                "wasi:cli",
315                "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
316                | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
317                Some(version),
318            )) if is_0_2(version, 0) => {}
319            Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
320                if is_0_2(version, 0) => {}
321            Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
322            Some(("wasi:filesystem", "preopens" | "types", Some(version)))
323                if is_0_2(version, 0) => {}
324            Some((
325                "wasi:http",
326                "incoming-handler" | "outgoing-handler" | "types",
327                Some(version),
328            )) if is_0_2(version, 0) => {}
329            Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
330                if is_0_2(version, 0) => {}
331            Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
332                if is_0_2(version, 0) => {}
333            Some((
334                "wasi:sockets",
335                "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
336                | "udp-create-socket" | "udp",
337                Some(version),
338            )) if is_0_2(version, 0) => {}
339            _ => {
340                if let Err(err) = link_item(
341                    &engine,
342                    &mut linker.root(),
343                    Arc::clone(&guest_resources),
344                    Arc::clone(&host_resources),
345                    item,
346                    "",
347                    name,
348                ) {
349                    error!(?err, "failed to polyfill instance");
350                }
351            }
352        }
353    }
354
355    let pre = linker
356        .instantiate_pre(&component)
357        .context("failed to pre-instantiate component")?;
358    Ok((pre, engine, guest_resources, host_resources))
359}
360
361fn new_store<C: Invoke>(
362    engine: &Engine,
363    wrpc: C,
364    cx: C::Context,
365    arg0: &str,
366    timeout: Duration,
367) -> wasmtime::Store<Ctx<C>> {
368    Store::new(
369        engine,
370        Ctx {
371            wasi: WasiCtxBuilder::new()
372                .inherit_env()
373                .inherit_stdio()
374                .inherit_network()
375                .allow_ip_name_lookup(true)
376                .allow_tcp(true)
377                .allow_udp(true)
378                .args(&[arg0])
379                .build(),
380            http: WasiHttpCtx::new(),
381            table: ResourceTable::new(),
382            shared_resources: SharedResourceTable::default(),
383            wrpc,
384            cx,
385            timeout,
386        },
387    )
388}
389
390#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
391pub async fn handle_run<C>(
392    clt: C,
393    cx: C::Context,
394    timeout: Duration,
395    workload: &str,
396) -> anyhow::Result<()>
397where
398    C: Invoke + Clone + 'static,
399    C::Context: Clone + 'static,
400{
401    let (pre, engine, _, _) =
402        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
403    let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
404    let cmd = wasmtime_wasi::bindings::CommandPre::new(pre)
405        .context("failed to construct `command` instance")?
406        .instantiate_async(&mut store)
407        .await
408        .context("failed to instantiate `command`")?;
409    cmd.wasi_cli_run()
410        .call_run(&mut store)
411        .await
412        .context("failed to run component")?
413        .map_err(|()| anyhow!("component failed"))
414}
415
416#[instrument(level = "trace", skip_all, ret(level = "trace"))]
417pub async fn serve_shared<C, S>(
418    handlers: &mut JoinSet<()>,
419    srv: S,
420    mut store: wasmtime::Store<Ctx<C>>,
421    pre: InstancePre<Ctx<C>>,
422    guest_resources: Arc<[ResourceType]>,
423    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
424) -> anyhow::Result<()>
425where
426    C: Invoke + 'static,
427    C::Context: Clone,
428    S: Serve,
429{
430    let span = Span::current();
431    let instance = pre
432        .instantiate_async(&mut store)
433        .await
434        .context("failed to instantiate component")?;
435    let engine = store.engine().clone();
436    let store = Arc::new(Mutex::new(store));
437    for (name, ty) in pre.component().component_type().exports(&engine) {
438        match (name, ty) {
439            (name, types::ComponentItem::ComponentFunc(ty)) => {
440                info!(?name, "serving root function");
441                let invocations = srv
442                    .serve_function_shared(
443                        Arc::clone(&store),
444                        instance,
445                        Arc::clone(&guest_resources),
446                        Arc::clone(&host_resources),
447                        ty,
448                        "",
449                        name,
450                    )
451                    .await?;
452                handlers.spawn(
453                    async move {
454                        let mut invocations = pin!(invocations);
455                        while let Some(invocation) = invocations.next().await {
456                            match invocation {
457                                Ok((_, fut)) => {
458                                    info!("serving root function invocation");
459                                    if let Err(err) = fut.await {
460                                        warn!(?err, "failed to serve root function invocation");
461                                    } else {
462                                        info!("successfully served root function invocation");
463                                    }
464                                }
465                                Err(err) => {
466                                    error!(?err, "failed to accept root function invocation");
467                                }
468                            }
469                        }
470                    }
471                    .instrument(span.clone()),
472                );
473            }
474            (_, types::ComponentItem::CoreFunc(_)) => {
475                warn!(name, "serving root core function exports not supported yet");
476            }
477            (_, types::ComponentItem::Module(_)) => {
478                warn!(name, "serving root module exports not supported yet");
479            }
480            (_, types::ComponentItem::Component(_)) => {
481                warn!(name, "serving root component exports not supported yet");
482            }
483            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
484                for (name, ty) in ty.exports(&engine) {
485                    match ty {
486                        types::ComponentItem::ComponentFunc(ty) => {
487                            info!(?name, "serving instance function");
488                            let invocations = srv
489                                .serve_function_shared(
490                                    Arc::clone(&store),
491                                    instance,
492                                    Arc::clone(&guest_resources),
493                                    Arc::clone(&host_resources),
494                                    ty,
495                                    instance_name,
496                                    name,
497                                )
498                                .await?;
499                            handlers.spawn(async move {
500                                let mut invocations = pin!(invocations);
501                                while let Some(invocation) = invocations.next().await {
502                                    match invocation {
503                                        Ok((_, fut)) => {
504                                            info!("serving instance function invocation");
505                                            if let Err(err) = fut.await {
506                                                warn!(
507                                                    ?err,
508                                                    "failed to serve instance function invocation"
509                                                );
510                                            } else {
511                                                info!(
512                                                    "successfully served instance function invocation"
513                                                );
514                                            }
515                                        }
516                                        Err(err) => {
517                                            error!(
518                                                ?err,
519                                                "failed to accept instance function invocation"
520                                            );
521                                        }
522                                    }
523                                }
524                            }
525                            .instrument(span.clone()));
526                        }
527                        types::ComponentItem::CoreFunc(_) => {
528                            warn!(
529                                instance_name,
530                                name, "serving instance core function exports not supported yet"
531                            );
532                        }
533                        types::ComponentItem::Module(_) => {
534                            warn!(
535                                instance_name,
536                                name, "serving instance module exports not supported yet"
537                            );
538                        }
539                        types::ComponentItem::Component(_) => {
540                            warn!(
541                                instance_name,
542                                name, "serving instance component exports not supported yet"
543                            );
544                        }
545                        types::ComponentItem::ComponentInstance(_) => {
546                            warn!(
547                                instance_name,
548                                name, "serving nested instance exports not supported yet"
549                            );
550                        }
551                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
552                    }
553                }
554            }
555            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
556        }
557    }
558    Ok(())
559}
560
561#[instrument(level = "trace", skip_all, ret(level = "trace"))]
562#[allow(clippy::too_many_arguments)]
563pub async fn serve_stateless<C, S>(
564    handlers: &mut JoinSet<()>,
565    srv: S,
566    clt: C,
567    cx: C::Context,
568    pre: InstancePre<Ctx<C>>,
569    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
570    engine: &Engine,
571    timeout: Duration,
572) -> anyhow::Result<()>
573where
574    C: Invoke + Clone + 'static,
575    C::Context: Clone + 'static,
576    S: Serve,
577{
578    let span = Span::current();
579    for (name, ty) in pre.component().component_type().exports(engine) {
580        match (name, ty) {
581            (name, types::ComponentItem::ComponentFunc(ty)) => {
582                let clt = clt.clone();
583                let cx = cx.clone();
584                let engine = engine.clone();
585                info!(?name, "serving root function");
586                let invocations = srv
587                    .serve_function(
588                        move || {
589                            new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
590                        },
591                        pre.clone(),
592                        Arc::clone(&host_resources),
593                        ty,
594                        "",
595                        name,
596                    )
597                    .await?;
598                handlers.spawn(
599                    async move {
600                        let mut invocations = pin!(invocations);
601                        while let Some(invocation) = invocations.next().await {
602                            match invocation {
603                                Ok((_, fut)) => {
604                                    info!("serving root function invocation");
605                                    if let Err(err) = fut.await {
606                                        warn!(?err, "failed to serve root function invocation");
607                                    } else {
608                                        info!("successfully served root function invocation");
609                                    }
610                                }
611                                Err(err) => {
612                                    error!(?err, "failed to accept root function invocation");
613                                }
614                            }
615                        }
616                    }
617                    .instrument(span.clone()),
618                );
619            }
620            (_, types::ComponentItem::CoreFunc(_)) => {
621                warn!(name, "serving root core function exports not supported yet");
622            }
623            (_, types::ComponentItem::Module(_)) => {
624                warn!(name, "serving root module exports not supported yet");
625            }
626            (_, types::ComponentItem::Component(_)) => {
627                warn!(name, "serving root component exports not supported yet");
628            }
629            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
630                for (name, ty) in ty.exports(engine) {
631                    match ty {
632                        types::ComponentItem::ComponentFunc(ty) => {
633                            let clt = clt.clone();
634                            let engine = engine.clone();
635                            let cx = cx.clone();
636                            info!(?name, "serving instance function");
637                            let invocations = srv
638                                .serve_function(
639                                    move || {
640                                        new_store(
641                                            &engine,
642                                            clt.clone(),
643                                            cx.clone(),
644                                            "reactor.wasm",
645                                            timeout,
646                                        )
647                                    },
648                                    pre.clone(),
649                                    Arc::clone(&host_resources),
650                                    ty,
651                                    instance_name,
652                                    name,
653                                )
654                                .await?;
655                            handlers.spawn(async move {
656                                let mut invocations = pin!(invocations);
657                                while let Some(invocation) = invocations.next().await {
658                                    match invocation {
659                                        Ok((_, fut)) => {
660                                            info!("serving instance function invocation");
661                                            if let Err(err) = fut.await {
662                                                warn!(
663                                                    ?err,
664                                                    "failed to serve instance function invocation"
665                                                );
666                                            } else {
667                                                info!(
668                                                    "successfully served instance function invocation"
669                                                );
670                                            }
671                                        }
672                                        Err(err) => {
673                                            error!(
674                                                ?err,
675                                                "failed to accept instance function invocation"
676                                            );
677                                        }
678                                    }
679                                }
680                            }.instrument(span.clone()));
681                        }
682                        types::ComponentItem::CoreFunc(_) => {
683                            warn!(
684                                instance_name,
685                                name, "serving instance core function exports not supported yet"
686                            );
687                        }
688                        types::ComponentItem::Module(_) => {
689                            warn!(
690                                instance_name,
691                                name, "serving instance module exports not supported yet"
692                            );
693                        }
694                        types::ComponentItem::Component(_) => {
695                            warn!(
696                                instance_name,
697                                name, "serving instance component exports not supported yet"
698                            );
699                        }
700                        types::ComponentItem::ComponentInstance(_) => {
701                            warn!(
702                                instance_name,
703                                name, "serving nested instance exports not supported yet"
704                            );
705                        }
706                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
707                    }
708                }
709            }
710            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
711        }
712    }
713    Ok(())
714}
715
716#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
717pub async fn handle_serve<C, S>(
718    srv: S,
719    clt: C,
720    cx: C::Context,
721    timeout: Duration,
722    workload: &str,
723) -> anyhow::Result<()>
724where
725    C: Invoke + Clone + 'static,
726    C::Context: Clone + 'static,
727    S: Serve,
728{
729    let (pre, engine, guest_resources, host_resources) =
730        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
731
732    let mut handlers = JoinSet::new();
733    if guest_resources.is_empty() {
734        serve_stateless(
735            &mut handlers,
736            srv,
737            clt,
738            cx,
739            pre,
740            host_resources,
741            &engine,
742            timeout,
743        )
744        .await?;
745    } else {
746        serve_shared(
747            &mut handlers,
748            srv,
749            new_store(&engine, clt, cx, "reactor.wasm", timeout),
750            pre,
751            guest_resources,
752            host_resources,
753        )
754        .await?;
755    }
756    while let Some(res) = handlers.join_next().await {
757        if let Err(err) = res {
758            error!(?err, "handler failed");
759        }
760    }
761    Ok(())
762}
763
764#[instrument(level = "trace", ret(level = "trace"))]
765pub async fn run() -> anyhow::Result<()> {
766    wrpc_cli::tracing::init();
767    match Command::parse() {
768        Command::Nats(args) => nats::run(args).await,
769        Command::Tcp(args) => tcp::run(args).await,
770    }
771}