Skip to main content

wrpc_wasmtime_cli/
lib.rs

1#![allow(clippy::type_complexity)]
2
3use core::iter;
4use core::ops::Bound;
5use core::pin::pin;
6use core::time::Duration;
7
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11use anyhow::{anyhow, bail, Context as _};
12use clap::Parser;
13use futures::StreamExt as _;
14use tokio::fs;
15use tokio::sync::Mutex;
16use tokio::task::JoinSet;
17use tracing::{error, info, instrument, warn, Instrument as _, Span};
18use url::Url;
19use wasi_preview1_component_adapter_provider::{
20    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
21    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
22};
23use wasmtime::component::{types, Component, InstancePre, Linker, ResourceTable, ResourceType};
24use wasmtime::{Engine, Store};
25use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
26use wasmtime_wasi_http::p2::{WasiHttpCtxView, WasiHttpView};
27use wasmtime_wasi_http::WasiHttpCtx;
28use wrpc_runtime_wasmtime::{
29    collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
30    RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
31};
32use wrpc_transport::{Invoke, Serve};
33
34mod nats;
35mod tcp;
36
37const DEFAULT_TIMEOUT: &str = "10s";
38
39#[derive(Parser, Debug)]
40#[command(author, version, about, long_about = None)]
41enum Command {
42    #[command(subcommand)]
43    Nats(nats::Command),
44    #[command(subcommand)]
45    Tcp(tcp::Command),
46}
47
48pub enum Workload {
49    Url(Url),
50    Binary(Vec<u8>),
51}
52
53pub struct WrpcCtx<C: Invoke> {
54    pub wrpc: C,
55    pub cx: C::Context,
56    pub shared_resources: SharedResourceTable,
57    pub timeout: Duration,
58}
59
60pub struct Ctx<C: Invoke> {
61    pub table: ResourceTable,
62    pub wasi: WasiCtx,
63    pub http: WasiHttpCtx,
64    pub wrpc: WrpcCtx<C>,
65}
66
67impl<C> wrpc_runtime_wasmtime::WrpcCtx<C> for WrpcCtx<C>
68where
69    C: Invoke,
70    C::Context: Clone,
71{
72    fn context(&self) -> C::Context {
73        self.cx.clone()
74    }
75
76    fn client(&self) -> &C {
77        &self.wrpc
78    }
79
80    fn shared_resources(&mut self) -> &mut SharedResourceTable {
81        &mut self.shared_resources
82    }
83
84    fn timeout(&self) -> Option<Duration> {
85        Some(self.timeout)
86    }
87}
88
89impl<C> WrpcView for Ctx<C>
90where
91    C: Invoke,
92    C::Context: Clone,
93{
94    type Invoke = C;
95
96    fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
97        WrpcCtxView {
98            ctx: &mut self.wrpc,
99            table: &mut self.table,
100        }
101    }
102}
103
104impl<C: Invoke> WasiView for Ctx<C> {
105    fn ctx(&mut self) -> WasiCtxView<'_> {
106        WasiCtxView {
107            ctx: &mut self.wasi,
108            table: &mut self.table,
109        }
110    }
111}
112
113impl<C: Invoke> WasiHttpView for Ctx<C> {
114    fn http(&mut self) -> WasiHttpCtxView<'_> {
115        WasiHttpCtxView {
116            ctx: &mut self.http,
117            table: &mut self.table,
118            hooks: wasmtime_wasi_http::p2::default_hooks(),
119        }
120    }
121}
122
123// https://github.com/bytecodealliance/wasmtime/blob/b943666650696f1eb7ff8b217762b58d5ef5779d/src/commands/serve.rs#L641-L656
124fn use_pooling_allocator_by_default() -> anyhow::Result<Option<bool>> {
125    const BITS_TO_TEST: u32 = 42;
126    let mut config = wasmtime::Config::new();
127    config.wasm_memory64(true);
128    config.memory_reservation(1 << BITS_TO_TEST);
129    let engine = wasmtime::Engine::new(&config)?;
130    let mut store = wasmtime::Store::new(&engine, ());
131    // NB: the maximum size is in wasm pages to take out the 16-bits of wasm
132    // page size here from the maximum size.
133    let ty = wasmtime::MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
134    if wasmtime::Memory::new(&mut store, ty).is_ok() {
135        Ok(Some(true))
136    } else {
137        Ok(None)
138    }
139}
140
141fn is_0_2(version: &str, min_patch: u64) -> bool {
142    if let Ok(semver::Version {
143        major,
144        minor,
145        patch,
146        pre,
147        build,
148    }) = version.parse()
149    {
150        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
151    } else {
152        false
153    }
154}
155
156#[instrument(level = "trace", skip(adapter))]
157async fn instantiate_pre<C>(
158    adapter: &[u8],
159    workload: &str,
160) -> anyhow::Result<(
161    InstancePre<Ctx<C>>,
162    Engine,
163    Arc<[ResourceType]>,
164    Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
165)>
166where
167    C: Invoke + Clone + 'static,
168    C::Context: Clone + 'static,
169{
170    let mut opts = wasmtime_cli_flags::CommonOptions::try_parse_from(iter::empty::<&'static str>())
171        .context("failed to construct common Wasmtime options")?;
172    let mut config = opts
173        .config(use_pooling_allocator_by_default().unwrap_or(None))
174        .map_err(anyhow::Error::from)
175        .context("failed to construct Wasmtime config")?;
176    config.wasm_component_model(true);
177    let engine = wasmtime::Engine::new(&config)
178        .map_err(anyhow::Error::from)
179        .context("failed to initialize Wasmtime engine")?;
180
181    let wasm = if workload.starts_with('.') || workload.starts_with('/') {
182        fs::read(&workload)
183            .await
184            .with_context(|| format!("failed to read relative path to workload `{workload}`"))
185            .map(Workload::Binary)
186    } else {
187        Url::parse(workload)
188            .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
189            .map(Workload::Url)
190    }?;
191    let wasm = match wasm {
192        Workload::Url(wasm) => match wasm.scheme() {
193            "file" => {
194                let wasm = wasm
195                    .to_file_path()
196                    .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
197                fs::read(wasm)
198                    .await
199                    .context("failed to read Wasm from file URL")?
200            }
201            "http" | "https" => {
202                let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
203                let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
204                wasm.to_vec()
205            }
206            scheme => bail!("URL scheme `{scheme}` not supported"),
207        },
208        Workload::Binary(wasm) => wasm,
209    };
210    let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
211        wit_component::ComponentEncoder::default()
212            .validate(true)
213            .module(&wasm)
214            .context("failed to set core component module")?
215            .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
216            .context("failed to add WASI adapter")?
217            .encode()
218            .context("failed to encode a component")?
219    } else {
220        wasm
221    };
222
223    let component = Component::new(&engine, wasm)
224        .map_err(anyhow::Error::from)
225        .context("failed to compile component")?;
226
227    let mut linker = Linker::<Ctx<C>>::new(&engine);
228    wasmtime_wasi::p2::add_to_linker_async(&mut linker)
229        .map_err(anyhow::Error::from)
230        .context("failed to link WASI")?;
231    wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker)
232        .map_err(anyhow::Error::from)
233        .context("failed to link `wasi:http`")?;
234    wrpc_runtime_wasmtime::rpc::add_to_linker(&mut linker)
235        .map_err(anyhow::Error::from)
236        .context("failed to link `wrpc:rpc`")?;
237
238    let ty = component.component_type();
239    let mut host_resources = BTreeMap::default();
240    let mut guest_resources = Vec::new();
241    collect_component_resource_imports(&engine, &ty, &mut host_resources);
242    collect_component_resource_exports(&engine, &ty, &mut guest_resources);
243    let io_err_tys = host_resources
244        .range::<str, _>((
245            Bound::Included("wasi:io/error@0.2"),
246            Bound::Excluded("wasi:io/error@0.3"),
247        ))
248        .flat_map(|(_, instance)| instance.get("error"))
249        .copied()
250        .collect::<Box<[_]>>();
251    let io_pollable_tys = host_resources
252        .range::<str, _>((
253            Bound::Included("wasi:io/poll@0.2"),
254            Bound::Excluded("wasi:io/poll@0.3"),
255        ))
256        .flat_map(|(_, instance)| instance.get("pollable"))
257        .copied()
258        .collect::<Box<[_]>>();
259    let io_input_stream_tys = host_resources
260        .range::<str, _>((
261            Bound::Included("wasi:io/streams@0.2"),
262            Bound::Excluded("wasi:io/streams@0.3"),
263        ))
264        .flat_map(|(_, instance)| instance.get("input-stream"))
265        .copied()
266        .collect::<Box<[_]>>();
267    let io_output_stream_tys = host_resources
268        .range::<str, _>((
269            Bound::Included("wasi:io/streams@0.2"),
270            Bound::Excluded("wasi:io/streams@0.3"),
271        ))
272        .flat_map(|(_, instance)| instance.get("output-stream"))
273        .copied()
274        .collect::<Box<[_]>>();
275    let rpc_err_ty = host_resources
276        .get("wrpc:rpc/error@0.1.0")
277        .and_then(|instance| instance.get("error"))
278        .copied();
279    // TODO: This should include `wasi:http` resources
280    let host_resources = host_resources
281        .into_iter()
282        .map(|(name, instance)| {
283            let instance = instance
284                .into_iter()
285                .map(|(name, ty)| {
286                    let host_ty = match ty {
287                        ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
288                        ty if io_err_tys.contains(&ty) => {
289                            ResourceType::host::<wasmtime_wasi::p2::bindings::io::error::Error>()
290                        }
291                        ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
292                            wasmtime_wasi::p2::bindings::io::streams::InputStream,
293                        >(),
294                        ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
295                            wasmtime_wasi::p2::bindings::io::streams::OutputStream,
296                        >(),
297                        ty if io_pollable_tys.contains(&ty) => {
298                            ResourceType::host::<wasmtime_wasi::p2::bindings::io::poll::Pollable>()
299                        }
300                        _ => ResourceType::host::<RemoteResource>(),
301                    };
302                    (name, (ty, host_ty))
303                })
304                .collect::<HashMap<_, _>>();
305            (name, instance)
306        })
307        .collect::<HashMap<_, _>>();
308    let host_resources = Arc::from(host_resources);
309    let guest_resources = Arc::from(guest_resources);
310    for (name, item) in ty.imports(&engine) {
311        // Avoid polyfilling instances, for which static bindings are linked
312        match name.split_once('/').map(|(pkg, suffix)| {
313            suffix
314                .split_once('@')
315                .map_or((pkg, suffix, None), |(iface, version)| {
316                    (pkg, iface, Some(version))
317                })
318        }) {
319            Some(("wrpc:rpc", "transport" | "error" | "context" | "invoker", Some("0.1.0"))) => {}
320            Some((
321                "wasi:cli",
322                "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
323                | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
324                Some(version),
325            )) if is_0_2(version, 0) => {}
326            Some(("wasi:clocks", "monotonic-clock" | "wall-clock", Some(version)))
327                if is_0_2(version, 0) => {}
328            Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
329            Some(("wasi:filesystem", "preopens" | "types", Some(version)))
330                if is_0_2(version, 0) => {}
331            Some((
332                "wasi:http",
333                "incoming-handler" | "outgoing-handler" | "types",
334                Some(version),
335            )) if is_0_2(version, 0) => {}
336            Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
337                if is_0_2(version, 0) => {}
338            Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
339                if is_0_2(version, 0) => {}
340            Some((
341                "wasi:sockets",
342                "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
343                | "udp-create-socket" | "udp",
344                Some(version),
345            )) if is_0_2(version, 0) => {}
346            _ => {
347                if let Err(err) = link_item(
348                    &engine,
349                    &mut linker.root(),
350                    Arc::clone(&guest_resources),
351                    Arc::clone(&host_resources),
352                    item,
353                    "",
354                    name,
355                ) {
356                    error!(?err, "failed to polyfill instance");
357                }
358            }
359        }
360    }
361
362    let pre = linker
363        .instantiate_pre(&component)
364        .map_err(anyhow::Error::from)
365        .context("failed to pre-instantiate component")?;
366    Ok((pre, engine, guest_resources, host_resources))
367}
368
369fn new_store<C: Invoke>(
370    engine: &Engine,
371    wrpc: C,
372    cx: C::Context,
373    arg0: &str,
374    timeout: Duration,
375) -> wasmtime::Store<Ctx<C>> {
376    Store::new(
377        engine,
378        Ctx {
379            wasi: WasiCtxBuilder::new()
380                .inherit_env()
381                .inherit_stdio()
382                .inherit_network()
383                .allow_ip_name_lookup(true)
384                .allow_tcp(true)
385                .allow_udp(true)
386                .args(&[arg0])
387                .build(),
388            http: WasiHttpCtx::new(),
389            table: ResourceTable::new(),
390            wrpc: WrpcCtx {
391                wrpc,
392                cx,
393                shared_resources: SharedResourceTable::default(),
394                timeout,
395            },
396        },
397    )
398}
399
400#[instrument(level = "trace", skip(clt, cx), ret(level = "trace"))]
401pub async fn handle_run<C>(
402    clt: C,
403    cx: C::Context,
404    timeout: Duration,
405    workload: &str,
406) -> anyhow::Result<()>
407where
408    C: Invoke + Clone + 'static,
409    C::Context: Clone + 'static,
410{
411    let (pre, engine, _, _) =
412        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, workload).await?;
413    let mut store = new_store(&engine, clt, cx, "command.wasm", timeout);
414    let cmd = wasmtime_wasi::p2::bindings::CommandPre::new(pre)
415        .map_err(anyhow::Error::from)
416        .context("failed to construct `command` instance")?
417        .instantiate_async(&mut store)
418        .await
419        .map_err(anyhow::Error::from)
420        .context("failed to instantiate `command`")?;
421    cmd.wasi_cli_run()
422        .call_run(&mut store)
423        .await
424        .map_err(anyhow::Error::from)
425        .context("failed to run component")?
426        .map_err(|()| anyhow!("component failed"))
427}
428
429#[instrument(level = "trace", skip_all, ret(level = "trace"))]
430pub async fn serve_shared<C, S>(
431    handlers: &mut JoinSet<()>,
432    srv: S,
433    mut store: wasmtime::Store<Ctx<C>>,
434    pre: InstancePre<Ctx<C>>,
435    guest_resources: Arc<[ResourceType]>,
436    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
437) -> anyhow::Result<()>
438where
439    C: Invoke + 'static,
440    C::Context: Clone,
441    S: Serve,
442{
443    let span = Span::current();
444    let instance = pre
445        .instantiate_async(&mut store)
446        .await
447        .map_err(anyhow::Error::from)
448        .context("failed to instantiate component")?;
449    let engine = store.engine().clone();
450    let io_stream_resources: Arc<[ResourceType]> =
451        wrpc_runtime_wasmtime::paths::wasi_io_stream_resources(
452            &engine,
453            &pre.component().component_type(),
454        )
455        .into();
456    let store = Arc::new(Mutex::new(store));
457    for (name, ty) in pre.component().component_type().exports(&engine) {
458        match (name, ty) {
459            (name, types::ComponentItem::ComponentFunc(ty)) => {
460                info!(?name, "serving root function");
461                let invocations = srv
462                    .serve_function_shared(
463                        Arc::clone(&store),
464                        instance,
465                        Arc::clone(&guest_resources),
466                        Arc::clone(&host_resources),
467                        Arc::clone(&io_stream_resources),
468                        ty,
469                        "",
470                        name,
471                    )
472                    .await?;
473                handlers.spawn(
474                    async move {
475                        let mut invocations = pin!(invocations);
476                        while let Some(invocation) = invocations.next().await {
477                            match invocation {
478                                Ok((_, fut)) => {
479                                    info!("serving root function invocation");
480                                    if let Err(err) = fut.await {
481                                        warn!(?err, "failed to serve root function invocation");
482                                    } else {
483                                        info!("successfully served root function invocation");
484                                    }
485                                }
486                                Err(err) => {
487                                    error!(?err, "failed to accept root function invocation");
488                                }
489                            }
490                        }
491                    }
492                    .instrument(span.clone()),
493                );
494            }
495            (_, types::ComponentItem::CoreFunc(_)) => {
496                warn!(name, "serving root core function exports not supported yet");
497            }
498            (_, types::ComponentItem::Module(_)) => {
499                warn!(name, "serving root module exports not supported yet");
500            }
501            (_, types::ComponentItem::Component(_)) => {
502                warn!(name, "serving root component exports not supported yet");
503            }
504            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
505                for (name, ty) in ty.exports(&engine) {
506                    match ty {
507                        types::ComponentItem::ComponentFunc(ty) => {
508                            info!(?name, "serving instance function");
509                            let invocations = srv
510                                .serve_function_shared(
511                                    Arc::clone(&store),
512                                    instance,
513                                    Arc::clone(&guest_resources),
514                                    Arc::clone(&host_resources),
515                                    Arc::clone(&io_stream_resources),
516                                    ty,
517                                    instance_name,
518                                    name,
519                                )
520                                .await?;
521                            handlers.spawn(async move {
522                                let mut invocations = pin!(invocations);
523                                while let Some(invocation) = invocations.next().await {
524                                    match invocation {
525                                        Ok((_, fut)) => {
526                                            info!("serving instance function invocation");
527                                            if let Err(err) = fut.await {
528                                                warn!(
529                                                    ?err,
530                                                    "failed to serve instance function invocation"
531                                                );
532                                            } else {
533                                                info!(
534                                                    "successfully served instance function invocation"
535                                                );
536                                            }
537                                        }
538                                        Err(err) => {
539                                            error!(
540                                                ?err,
541                                                "failed to accept instance function invocation"
542                                            );
543                                        }
544                                    }
545                                }
546                            }
547                            .instrument(span.clone()));
548                        }
549                        types::ComponentItem::CoreFunc(_) => {
550                            warn!(
551                                instance_name,
552                                name, "serving instance core function exports not supported yet"
553                            );
554                        }
555                        types::ComponentItem::Module(_) => {
556                            warn!(
557                                instance_name,
558                                name, "serving instance module exports not supported yet"
559                            );
560                        }
561                        types::ComponentItem::Component(_) => {
562                            warn!(
563                                instance_name,
564                                name, "serving instance component exports not supported yet"
565                            );
566                        }
567                        types::ComponentItem::ComponentInstance(_) => {
568                            warn!(
569                                instance_name,
570                                name, "serving nested instance exports not supported yet"
571                            );
572                        }
573                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
574                    }
575                }
576            }
577            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
578        }
579    }
580    Ok(())
581}
582
583#[instrument(level = "trace", skip_all, ret(level = "trace"))]
584#[allow(clippy::too_many_arguments)]
585pub async fn serve_stateless<C, S>(
586    handlers: &mut JoinSet<()>,
587    srv: S,
588    clt: C,
589    cx: C::Context,
590    pre: InstancePre<Ctx<C>>,
591    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
592    engine: &Engine,
593    timeout: Duration,
594) -> anyhow::Result<()>
595where
596    C: Invoke + Clone + 'static,
597    C::Context: Clone + 'static,
598    S: Serve,
599{
600    let span = Span::current();
601    for (name, ty) in pre.component().component_type().exports(engine) {
602        match (name, ty) {
603            (name, types::ComponentItem::ComponentFunc(ty)) => {
604                let clt = clt.clone();
605                let cx = cx.clone();
606                let engine = engine.clone();
607                info!(?name, "serving root function");
608                let invocations = srv
609                    .serve_function(
610                        move || {
611                            new_store(&engine, clt.clone(), cx.clone(), "reactor.wasm", timeout)
612                        },
613                        pre.clone(),
614                        Arc::clone(&host_resources),
615                        ty,
616                        "",
617                        name,
618                    )
619                    .await?;
620                handlers.spawn(
621                    async move {
622                        let mut invocations = pin!(invocations);
623                        while let Some(invocation) = invocations.next().await {
624                            match invocation {
625                                Ok((_, fut)) => {
626                                    info!("serving root function invocation");
627                                    if let Err(err) = fut.await {
628                                        warn!(?err, "failed to serve root function invocation");
629                                    } else {
630                                        info!("successfully served root function invocation");
631                                    }
632                                }
633                                Err(err) => {
634                                    error!(?err, "failed to accept root function invocation");
635                                }
636                            }
637                        }
638                    }
639                    .instrument(span.clone()),
640                );
641            }
642            (_, types::ComponentItem::CoreFunc(_)) => {
643                warn!(name, "serving root core function exports not supported yet");
644            }
645            (_, types::ComponentItem::Module(_)) => {
646                warn!(name, "serving root module exports not supported yet");
647            }
648            (_, types::ComponentItem::Component(_)) => {
649                warn!(name, "serving root component exports not supported yet");
650            }
651            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
652                for (name, ty) in ty.exports(engine) {
653                    match ty {
654                        types::ComponentItem::ComponentFunc(ty) => {
655                            let clt = clt.clone();
656                            let engine = engine.clone();
657                            let cx = cx.clone();
658                            info!(?name, "serving instance function");
659                            let invocations = srv
660                                .serve_function(
661                                    move || {
662                                        new_store(
663                                            &engine,
664                                            clt.clone(),
665                                            cx.clone(),
666                                            "reactor.wasm",
667                                            timeout,
668                                        )
669                                    },
670                                    pre.clone(),
671                                    Arc::clone(&host_resources),
672                                    ty,
673                                    instance_name,
674                                    name,
675                                )
676                                .await?;
677                            handlers.spawn(async move {
678                                let mut invocations = pin!(invocations);
679                                while let Some(invocation) = invocations.next().await {
680                                    match invocation {
681                                        Ok((_, fut)) => {
682                                            info!("serving instance function invocation");
683                                            if let Err(err) = fut.await {
684                                                warn!(
685                                                    ?err,
686                                                    "failed to serve instance function invocation"
687                                                );
688                                            } else {
689                                                info!(
690                                                    "successfully served instance function invocation"
691                                                );
692                                            }
693                                        }
694                                        Err(err) => {
695                                            error!(
696                                                ?err,
697                                                "failed to accept instance function invocation"
698                                            );
699                                        }
700                                    }
701                                }
702                            }.instrument(span.clone()));
703                        }
704                        types::ComponentItem::CoreFunc(_) => {
705                            warn!(
706                                instance_name,
707                                name, "serving instance core function exports not supported yet"
708                            );
709                        }
710                        types::ComponentItem::Module(_) => {
711                            warn!(
712                                instance_name,
713                                name, "serving instance module exports not supported yet"
714                            );
715                        }
716                        types::ComponentItem::Component(_) => {
717                            warn!(
718                                instance_name,
719                                name, "serving instance component exports not supported yet"
720                            );
721                        }
722                        types::ComponentItem::ComponentInstance(_) => {
723                            warn!(
724                                instance_name,
725                                name, "serving nested instance exports not supported yet"
726                            );
727                        }
728                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
729                    }
730                }
731            }
732            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
733        }
734    }
735    Ok(())
736}
737
738#[instrument(level = "trace", skip(srv, clt, cx), ret(level = "trace"))]
739pub async fn handle_serve<C, S>(
740    srv: S,
741    clt: C,
742    cx: C::Context,
743    timeout: Duration,
744    workload: &str,
745) -> anyhow::Result<()>
746where
747    C: Invoke + Clone + 'static,
748    C::Context: Clone + 'static,
749    S: Serve,
750{
751    let (pre, engine, guest_resources, host_resources) =
752        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, workload).await?;
753
754    let mut handlers = JoinSet::new();
755    if guest_resources.is_empty() {
756        serve_stateless(
757            &mut handlers,
758            srv,
759            clt,
760            cx,
761            pre,
762            host_resources,
763            &engine,
764            timeout,
765        )
766        .await?;
767    } else {
768        serve_shared(
769            &mut handlers,
770            srv,
771            new_store(&engine, clt, cx, "reactor.wasm", timeout),
772            pre,
773            guest_resources,
774            host_resources,
775        )
776        .await?;
777    }
778    while let Some(res) = handlers.join_next().await {
779        if let Err(err) = res {
780            error!(?err, "handler failed");
781        }
782    }
783    Ok(())
784}
785
786#[instrument(level = "trace", ret(level = "trace"))]
787pub async fn run() -> anyhow::Result<()> {
788    wrpc_cli::tracing::init();
789    match Command::parse() {
790        Command::Nats(args) => nats::run(args).await,
791        Command::Tcp(args) => tcp::run(args).await,
792    }
793}