wrpc_wasmtime_cli/
lib.rs

1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceTable, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
26use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
27use wrpc_runtime_wasmtime::{
28    collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
29    RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
30};
31use wrpc_transport::{Invoke, Serve};
32
33mod nats;
34mod tcp;
35
36const DEFAULT_TIMEOUT: &str = "10s";
37
38#[derive(Parser, Debug)]
39#[command(author, version, about, long_about = None)]
40enum Command {
41    #[command(subcommand)]
42    Nats(nats::Command),
43    #[command(subcommand)]
44    Tcp(tcp::Command),
45}
46
47pub enum Workload {
48    Url(Url),
49    Binary(Vec<u8>),
50}
51
52pub struct WrpcCtx<C: Invoke> {
53    pub wrpc: C,
54    pub cx: C::Context,
55    pub shared_resources: SharedResourceTable,
56    pub timeout: Duration,
57}
58
59pub struct Ctx<C: Invoke> {
60    pub table: ResourceTable,
61    pub wasi: WasiCtx,
62    pub http: WasiHttpCtx,
63    pub wrpc: WrpcCtx<C>,
64}
65
66impl<C> wrpc_runtime_wasmtime::WrpcCtx<C> for WrpcCtx<C>
67where
68    C: Invoke,
69    C::Context: Clone,
70{
71    fn context(&self) -> C::Context {
72        self.cx.clone()
73    }
74
75    fn client(&self) -> &C {
76        &self.wrpc
77    }
78
79    fn shared_resources(&mut self) -> &mut SharedResourceTable {
80        &mut self.shared_resources
81    }
82
83    fn timeout(&self) -> Option<Duration> {
84        Some(self.timeout)
85    }
86}
87
88impl<C> WrpcView for Ctx<C>
89where
90    C: Invoke,
91    C::Context: Clone,
92{
93    type Invoke = C;
94
95    fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
96        WrpcCtxView {
97            ctx: &mut self.wrpc,
98            table: &mut self.table,
99        }
100    }
101}
102
103impl<C: Invoke> WasiView for Ctx<C> {
104    fn ctx(&mut self) -> WasiCtxView<'_> {
105        WasiCtxView {
106            ctx: &mut self.wasi,
107            table: &mut self.table,
108        }
109    }
110}
111
112impl<C: Invoke> WasiHttpView for Ctx<C> {
113    fn ctx(&mut self) -> &mut WasiHttpCtx {
114        &mut self.http
115    }
116
117    fn table(&mut self) -> &mut ResourceTable {
118        &mut self.table
119    }
120}
121
122// https://github.com/bytecodealliance/wasmtime/blob/b943666650696f1eb7ff8b217762b58d5ef5779d/src/commands/serve.rs#L641-L656
123fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
124    const BITS_TO_TEST: u32 = 42;
125    let mut config = wasmtime::Config::new();
126    config.wasm_memory64(true);
127    config.memory_reservation(1 << BITS_TO_TEST);
128    let engine = wasmtime::Engine::new(&config)?;
129    let mut store = wasmtime::Store::new(&engine, ());
130    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
131    // page size here from the maximum size.
132    let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
133    if wasmtime::Memory::new(&mut store, ty).is_ok() {
134        Ok(Some(true))
135    } else {
136        Ok(None)
137    }
138}
139
140fn is_0_2(version: &str, min_patch: u64) -> bool {
141    if let Ok(semver::Version {
142        major,
143        minor,
144        patch,
145        pre,
146        build,
147    }) = version.parse()
148    {
149        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
150    } else {
151        false
152    }
153}
154
155#[instrument(level = "trace", skip(adapter))]
156async fn instantiate_pre<C>(
157    adapter: &[u8],
158    workload: &str,
159) -> anyhow::Result<(
160    InstancePre<Ctx<C>>,
161    Engine,
162    Arc<[ResourceType]>,
163    Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
164)>
165where
166    C: Invoke + Clone + 'static,
167    C::Context: Clone + 'static,
168{
169    let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
170        .context("failed to construct common Wasmtime options")?;
171    let mut config = opts
172        .config(use_pooling_allocator_by_default().unwrap_or(None))
173        .context("failed to construct Wasmtime config")?;
174    config.wasm_component_model(true);
175    config.async_support(true);
176    let engine = wasmtime::Engine::new(&config).context("failed to initialize Wasmtime engine")?;
177
178    let wasm = if workload.starts_with('.') || workload.starts_with('/') {
179        fs::read(&workload)
180            .await
181            .with_context(|| format!("failed to read relative path to workload `{workload}`"))
182            .map(Workload::Binary)
183    } else {
184        Url::parse(workload)
185            .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
186            .map(Workload::Url)
187    }?;
188    let wasm = match wasm {
189        Workload::Url(wasm) => match wasm.scheme() {
190            "file" => {
191                let wasm = wasm
192                    .to_file_path()
193                    .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
194                fs::read(wasm)
195                    .await
196                    .context("failed to read Wasm from file URL")?
197            }
198            "http" | "https" => {
199                let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
200                let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
201                wasm.to_vec()
202            }
203            scheme => bail!("URL scheme `{scheme}` not supported"),
204        },
205        Workload::Binary(wasm) => wasm,
206    };
207    let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
208        wit_component::ComponentEncoder::default()
209            .validate(true)
210            .module(&wasm)
211            .context("failed to set core component module")?
212            .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
213            .context("failed to add WASI adapter")?
214            .encode()
215            .context("failed to encode a component")?
216    } else {
217        wasm
218    };
219
220    let component = Component::new(&engine, wasm).context("failed to compile component")?;
221
222    let mut linker = Linker::<Ctx<C>>::new(&engine);
223    wasmtime_wasi::p2::add_to_linker_async(&mut linker).context("failed to link WASI")?;
224    wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)
225        .context("failed to link `wasi:http`")?;
226    wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker).context("failed to link `wrpc:rpc`")?;
227
228    let ty = component.component_type();
229    let mut host_resources = BTreeMap::default();
230    let mut guest_resources = Vec::new();
231    collect_component_resource_imports(&engine, &ty, &mut host_resources);
232    collect_component_resource_exports(&engine, &ty, &mut guest_resources);
233    let io_err_tys = host_resources
234        .range::<str, _>((
235            Bound::Included("wasi:io/error@0.2"),
236            Bound::Excluded("wasi:io/error@0.3"),
237        ))
238        .map(|(name, instance)| {
239            instance
240                .get("error")
241                .copied()
242                .with_context(|| format!("{name} instance import missing `error` resource"))
243        })
244        .collect::<anyhow::Result<Box<[_]>>>()?;
245    let io_pollable_tys = host_resources
246        .range::<str, _>((
247            Bound::Included("wasi:io/poll@0.2"),
248            Bound::Excluded("wasi:io/poll@0.3"),
249        ))
250        .map(|(name, instance)| {
251            instance
252                .get("pollable")
253                .copied()
254                .with_context(|| format!("{name} instance import missing `pollable` resource"))
255        })
256        .collect::<anyhow::Result<Box<[_]>>>()?;
257    let io_input_stream_tys = host_resources
258        .range::<str, _>((
259            Bound::Included("wasi:io/streams@0.2"),
260            Bound::Excluded("wasi:io/streams@0.3"),
261        ))
262        .map(|(name, instance)| {
263            instance
264                .get("input-stream")
265                .copied()
266                .with_context(|| format!("{name} instance import missing `input-stream` resource"))
267        })
268        .collect::<anyhow::Result<Box<[_]>>>()?;
269    let io_output_stream_tys = host_resources
270        .range::<str, _>((
271            Bound::Included("wasi:io/streams@0.2"),
272            Bound::Excluded("wasi:io/streams@0.3"),
273        ))
274        .map(|(name, instance)| {
275            instance
276                .get("output-stream")
277                .copied()
278                .with_context(|| format!("{name} instance import missing `output-stream` resource"))
279        })
280        .collect::<anyhow::Result<Box<[_]>>>()?;
281    let rpc_err_ty = host_resources
282        .get("wrpc:rpc/error@0.1.0")
283        .map(|instance| {
284            instance
285                .get("error")
286                .copied()
287                .context("`wrpc:rpc/error@0.1.0` instance import missing `error` resource")
288        })
289        .transpose()?;
290    // TODO: This should include `wasi:http` resources
291    let host_resources = host_resources
292        .into_iter()
293        .map(|(name, instance)| {
294            let instance = instance
295                .into_iter()
296                .map(|(name, ty)| {
297                    let host_ty = match ty {
298                        ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
299                        ty if io_err_tys.contains(&ty) => {
300                            ResourceType::host::<wasmtime_wasi::p2::bindings::io::error::Error>()
301                        }
302                        ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
303                            wasmtime_wasi::p2::bindings::io::streams::InputStream,
304                        >(),
305                        ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
306                            wasmtime_wasi::p2::bindings::io::streams::OutputStream,
307                        >(),
308                        ty if io_pollable_tys.contains(&ty) => {
309                            ResourceType::host::<wasmtime_wasi::p2::bindings::io::poll::Pollable>()
310                        }
311                        _ => ResourceType::host::<RemoteResource>(),
312                    };
313                    (name, (ty, host_ty))
314                })
315                .collect::<HashMap<_, _>>();
316            (name, instance)
317        })
318        .collect::<HashMap<_, _>>();
319    let host_resources = Arc::from(host_resources);
320    let guest_resources = Arc::from(guest_resources);
321    for (name, item) in ty.imports(&engine) {
322        // Avoid polyfilling instances, for which static bindings are linked
323        match name.split_once('/').map(|(pkg, suffix)| {
324            suffix
325                .split_once('@')
326                .map_or((pkg, suffix, None), |(iface, version)| {
327                    (pkg, iface, Some(version))
328                })
329        }) {
330            Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
331            Some((
332                "wasi:cli",
333                "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
334                | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
335                Some(version),
336            )) if is_0_2(version, 0) => {}
337            Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
338                if is_0_2(version, 0) => {}
339            Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
340            Some(("wasi:filesystem", "preopens" | "types", Some(version)))
341                if is_0_2(version, 0) => {}
342            Some((
343                "wasi:http",
344                "incoming-handler" | "outgoing-handler" | "types",
345                Some(version),
346            )) if is_0_2(version, 0) => {}
347            Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
348                if is_0_2(version, 0) => {}
349            Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
350                if is_0_2(version, 0) => {}
351            Some((
352                "wasi:sockets",
353                "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
354                | "udp-create-socket" | "udp",
355                Some(version),
356            )) if is_0_2(version, 0) => {}
357            _ => {
358                if let Err(err) = link_item(
359                    &engine,
360                    &mut linker.root(),
361                    Arc::clone(&guest_resources),
362                    Arc::clone(&host_resources),
363                    item,
364                    "",
365                    name,
366                ) {
367                    error!(?err, "failed to polyfill instance");
368                }
369            }
370        }
371    }
372
373    let pre = linker
374        .instantiate_pre(&component)
375        .context("failed to pre-instantiate component")?;
376    Ok((pre, engine, guest_resources, host_resources))
377}
378
379fn new_store<C: Invoke>(
380    engine: &Engine,
381    wrpc: C,
382    cx: C::Context,
383    arg0: &str,
384    timeout: Duration,
385) -> wasmtime::Store<Ctx<C>> {
386    Store::new(
387        engine,
388        Ctx {
389            wasi: WasiCtxBuilder::new()
390                .inherit_env()
391                .inherit_stdio()
392                .inherit_network()
393                .allow_ip_name_lookup(true)
394                .allow_tcp(true)
395                .allow_udp(true)
396                .args(&[arg0])
397                .build(),
398            http: WasiHttpCtx::new(),
399            table: ResourceTable::new(),
400            wrpc: WrpcCtx {
401                wrpc,
402                cx,
403                shared_resources: SharedResourceTable::default(),
404                timeout,
405            },
406        },
407    )
408}
409
410#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
411pub async fn handle_run<C>(
412    clt: C,
413    cx: C::Context,
414    timeout: Duration,
415    workload: &str,
416) -> anyhow::Result<()>
417where
418    C: Invoke + Clone + 'static,
419    C::Context: Clone + 'static,
420{
421    let (pre, engine, _, _) =
422        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
423    let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
424    let cmd = wasmtime_wasi::p2::bindings::CommandPre::new(pre)
425        .context("failed to construct `command` instance")?
426        .instantiate_async(&mut store)
427        .await
428        .context("failed to instantiate `command`")?;
429    cmd.wasi_cli_run()
430        .call_run(&mut store)
431        .await
432        .context("failed to run component")?
433        .map_err(|()| anyhow!("component failed"))
434}
435
436#[instrument(level = "trace", skip_all, ret(level = "trace"))]
437pub async fn serve_shared<C, S>(
438    handlers: &mut JoinSet<()>,
439    srv: S,
440    mut store: wasmtime::Store<Ctx<C>>,
441    pre: InstancePre<Ctx<C>>,
442    guest_resources: Arc<[ResourceType]>,
443    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
444) -> anyhow::Result<()>
445where
446    C: Invoke + 'static,
447    C::Context: Clone,
448    S: Serve,
449{
450    let span = Span::current();
451    let instance = pre
452        .instantiate_async(&mut store)
453        .await
454        .context("failed to instantiate component")?;
455    let engine = store.engine().clone();
456    let store = Arc::new(Mutex::new(store));
457    for (name, ty) in pre.component().component_type().exports(&engine) {
458        match (name, ty) {
459            (name, types::ComponentItem::ComponentFunc(ty)) => {
460                info!(?name, "serving root function");
461                let invocations = srv
462                    .serve_function_shared(
463                        Arc::clone(&store),
464                        instance,
465                        Arc::clone(&guest_resources),
466                        Arc::clone(&host_resources),
467                        ty,
468                        "",
469                        name,
470                    )
471                    .await?;
472                handlers.spawn(
473                    async move {
474                        let mut invocations = pin!(invocations);
475                        while let Some(invocation) = invocations.next().await {
476                            match invocation {
477                                Ok((_, fut)) => {
478                                    info!("serving root function invocation");
479                                    if let Err(err) = fut.await {
480                                        warn!(?err, "failed to serve root function invocation");
481                                    } else {
482                                        info!("successfully served root function invocation");
483                                    }
484                                }
485                                Err(err) => {
486                                    error!(?err, "failed to accept root function invocation");
487                                }
488                            }
489                        }
490                    }
491                    .instrument(span.clone()),
492                );
493            }
494            (_, types::ComponentItem::CoreFunc(_)) => {
495                warn!(name, "serving root core function exports not supported yet");
496            }
497            (_, types::ComponentItem::Module(_)) => {
498                warn!(name, "serving root module exports not supported yet");
499            }
500            (_, types::ComponentItem::Component(_)) => {
501                warn!(name, "serving root component exports not supported yet");
502            }
503            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
504                for (name, ty) in ty.exports(&engine) {
505                    match ty {
506                        types::ComponentItem::ComponentFunc(ty) => {
507                            info!(?name, "serving instance function");
508                            let invocations = srv
509                                .serve_function_shared(
510                                    Arc::clone(&store),
511                                    instance,
512                                    Arc::clone(&guest_resources),
513                                    Arc::clone(&host_resources),
514                                    ty,
515                                    instance_name,
516                                    name,
517                                )
518                                .await?;
519                            handlers.spawn(async move {
520                                let mut invocations = pin!(invocations);
521                                while let Some(invocation) = invocations.next().await {
522                                    match invocation {
523                                        Ok((_, fut)) => {
524                                            info!("serving instance function invocation");
525                                            if let Err(err) = fut.await {
526                                                warn!(
527                                                    ?err,
528                                                    "failed to serve instance function invocation"
529                                                );
530                                            } else {
531                                                info!(
532                                                    "successfully served instance function invocation"
533                                                );
534                                            }
535                                        }
536                                        Err(err) => {
537                                            error!(
538                                                ?err,
539                                                "failed to accept instance function invocation"
540                                            );
541                                        }
542                                    }
543                                }
544                            }
545                            .instrument(span.clone()));
546                        }
547                        types::ComponentItem::CoreFunc(_) => {
548                            warn!(
549                                instance_name,
550                                name, "serving instance core function exports not supported yet"
551                            );
552                        }
553                        types::ComponentItem::Module(_) => {
554                            warn!(
555                                instance_name,
556                                name, "serving instance module exports not supported yet"
557                            );
558                        }
559                        types::ComponentItem::Component(_) => {
560                            warn!(
561                                instance_name,
562                                name, "serving instance component exports not supported yet"
563                            );
564                        }
565                        types::ComponentItem::ComponentInstance(_) => {
566                            warn!(
567                                instance_name,
568                                name, "serving nested instance exports not supported yet"
569                            );
570                        }
571                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
572                    }
573                }
574            }
575            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
576        }
577    }
578    Ok(())
579}
580
581#[instrument(level = "trace", skip_all, ret(level = "trace"))]
582#[allow(clippy::too_many_arguments)]
583pub async fn serve_stateless<C, S>(
584    handlers: &mut JoinSet<()>,
585    srv: S,
586    clt: C,
587    cx: C::Context,
588    pre: InstancePre<Ctx<C>>,
589    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
590    engine: &Engine,
591    timeout: Duration,
592) -> anyhow::Result<()>
593where
594    C: Invoke + Clone + 'static,
595    C::Context: Clone + 'static,
596    S: Serve,
597{
598    let span = Span::current();
599    for (name, ty) in pre.component().component_type().exports(engine) {
600        match (name, ty) {
601            (name, types::ComponentItem::ComponentFunc(ty)) => {
602                let clt = clt.clone();
603                let cx = cx.clone();
604                let engine = engine.clone();
605                info!(?name, "serving root function");
606                let invocations = srv
607                    .serve_function(
608                        move || {
609                            new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
610                        },
611                        pre.clone(),
612                        Arc::clone(&host_resources),
613                        ty,
614                        "",
615                        name,
616                    )
617                    .await?;
618                handlers.spawn(
619                    async move {
620                        let mut invocations = pin!(invocations);
621                        while let Some(invocation) = invocations.next().await {
622                            match invocation {
623                                Ok((_, fut)) => {
624                                    info!("serving root function invocation");
625                                    if let Err(err) = fut.await {
626                                        warn!(?err, "failed to serve root function invocation");
627                                    } else {
628                                        info!("successfully served root function invocation");
629                                    }
630                                }
631                                Err(err) => {
632                                    error!(?err, "failed to accept root function invocation");
633                                }
634                            }
635                        }
636                    }
637                    .instrument(span.clone()),
638                );
639            }
640            (_, types::ComponentItem::CoreFunc(_)) => {
641                warn!(name, "serving root core function exports not supported yet");
642            }
643            (_, types::ComponentItem::Module(_)) => {
644                warn!(name, "serving root module exports not supported yet");
645            }
646            (_, types::ComponentItem::Component(_)) => {
647                warn!(name, "serving root component exports not supported yet");
648            }
649            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
650                for (name, ty) in ty.exports(engine) {
651                    match ty {
652                        types::ComponentItem::ComponentFunc(ty) => {
653                            let clt = clt.clone();
654                            let engine = engine.clone();
655                            let cx = cx.clone();
656                            info!(?name, "serving instance function");
657                            let invocations = srv
658                                .serve_function(
659                                    move || {
660                                        new_store(
661                                            &engine,
662                                            clt.clone(),
663                                            cx.clone(),
664                                            "reactor.wasm",
665                                            timeout,
666                                        )
667                                    },
668                                    pre.clone(),
669                                    Arc::clone(&host_resources),
670                                    ty,
671                                    instance_name,
672                                    name,
673                                )
674                                .await?;
675                            handlers.spawn(async move {
676                                let mut invocations = pin!(invocations);
677                                while let Some(invocation) = invocations.next().await {
678                                    match invocation {
679                                        Ok((_, fut)) => {
680                                            info!("serving instance function invocation");
681                                            if let Err(err) = fut.await {
682                                                warn!(
683                                                    ?err,
684                                                    "failed to serve instance function invocation"
685                                                );
686                                            } else {
687                                                info!(
688                                                    "successfully served instance function invocation"
689                                                );
690                                            }
691                                        }
692                                        Err(err) => {
693                                            error!(
694                                                ?err,
695                                                "failed to accept instance function invocation"
696                                            );
697                                        }
698                                    }
699                                }
700                            }.instrument(span.clone()));
701                        }
702                        types::ComponentItem::CoreFunc(_) => {
703                            warn!(
704                                instance_name,
705                                name, "serving instance core function exports not supported yet"
706                            );
707                        }
708                        types::ComponentItem::Module(_) => {
709                            warn!(
710                                instance_name,
711                                name, "serving instance module exports not supported yet"
712                            );
713                        }
714                        types::ComponentItem::Component(_) => {
715                            warn!(
716                                instance_name,
717                                name, "serving instance component exports not supported yet"
718                            );
719                        }
720                        types::ComponentItem::ComponentInstance(_) => {
721                            warn!(
722                                instance_name,
723                                name, "serving nested instance exports not supported yet"
724                            );
725                        }
726                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
727                    }
728                }
729            }
730            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
731        }
732    }
733    Ok(())
734}
735
736#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
737pub async fn handle_serve<C, S>(
738    srv: S,
739    clt: C,
740    cx: C::Context,
741    timeout: Duration,
742    workload: &str,
743) -> anyhow::Result<()>
744where
745    C: Invoke + Clone + 'static,
746    C::Context: Clone + 'static,
747    S: Serve,
748{
749    let (pre, engine, guest_resources, host_resources) =
750        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
751
752    let mut handlers = JoinSet::new();
753    if guest_resources.is_empty() {
754        serve_stateless(
755            &mut handlers,
756            srv,
757            clt,
758            cx,
759            pre,
760            host_resources,
761            &engine,
762            timeout,
763        )
764        .await?;
765    } else {
766        serve_shared(
767            &mut handlers,
768            srv,
769            new_store(&engine, clt, cx, "reactor.wasm", timeout),
770            pre,
771            guest_resources,
772            host_resources,
773        )
774        .await?;
775    }
776    while let Some(res) = handlers.join_next().await {
777        if let Err(err) = res {
778            error!(?err, "handler failed");
779        }
780    }
781    Ok(())
782}
783
784#[instrument(level = "trace", ret(level = "trace"))]
785pub async fn run() -> anyhow::Result<()> {
786    wrpc_cli::tracing::init();
787    match Command::parse() {
788        Command::Nats(args) => nats::run(args).await,
789        Command::Tcp(args) => tcp::run(args).await,
790    }
791}