wrpc_wasmtime_nats_cli/
lib.rs

1#![allow(clippy::type_complexity)]
2
3use core::pin::pin;
4use core::time::Duration;
5
6use std::sync::Arc;
7
8use anyhow::{anyhow, bail, Context as _};
9use clap::Parser;
10use futures::StreamExt as _;
11use tokio::fs;
12use tokio::sync::Mutex;
13use tokio::task::JoinSet;
14use tracing::{error, info, instrument, warn, Instrument as _, Span};
15use url::Url;
16use wasi_preview1_component_adapter_provider::{
17    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
18    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
19};
20use wasmtime::component::{types, Component, InstancePre, Linker, ResourceType};
21use wasmtime::{Engine, Store};
22use wasmtime_wasi::{ResourceTable, WasiCtxBuilder};
23use wasmtime_wasi::{WasiCtx, WasiView};
24use wrpc_runtime_wasmtime::{
25    collect_component_resources, link_item, ServeExt as _, SharedResourceTable, WrpcView,
26};
27use wrpc_transport::Invoke;
28
29const DEFAULT_TIMEOUT: &str = "10s";
30
31#[derive(Parser, Debug)]
32#[command(author, version, about, long_about = None)]
33enum Command {
34    Run(RunArgs),
35    Serve(ServeArgs),
36}
37
38/// Run a command component
39#[derive(Parser, Debug)]
40struct RunArgs {
41    /// NATS address to use
42    #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)]
43    nats: String,
44
45    /// Invocation timeout
46    #[arg(long, default_value = DEFAULT_TIMEOUT)]
47    timeout: humantime::Duration,
48
49    /// Target prefix to send invocations to
50    target: String,
51
52    /// Path or URL to Wasm command component
53    workload: String,
54}
55
56/// Serve a reactor component
57#[derive(Parser, Debug)]
58struct ServeArgs {
59    /// NATS address to use
60    #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)]
61    nats: String,
62
63    /// Invocation timeout
64    #[arg(long, default_value = DEFAULT_TIMEOUT)]
65    timeout: humantime::Duration,
66
67    /// NATS queue group to use
68    #[arg(short, long)]
69    group: Option<String>,
70
71    /// Target prefix to send invocations to
72    target: String,
73
74    /// Prefix to listen on
75    prefix: String,
76
77    /// Path or URL to Wasm command component
78    workload: String,
79}
80
81pub enum Workload {
82    Url(Url),
83    Binary(Vec<u8>),
84}
85
86pub struct Ctx<C: Invoke> {
87    pub table: ResourceTable,
88    pub wasi: WasiCtx,
89    pub wrpc: C,
90    pub shared_resources: SharedResourceTable,
91    pub timeout: Duration,
92}
93
94impl<C: Invoke> WrpcView for Ctx<C> {
95    type Invoke = C;
96
97    fn client(&self) -> &Self::Invoke {
98        &self.wrpc
99    }
100
101    fn shared_resources(&mut self) -> &mut SharedResourceTable {
102        &mut self.shared_resources
103    }
104
105    fn timeout(&self) -> Option<Duration> {
106        Some(self.timeout)
107    }
108}
109
110impl<C: Invoke> WasiView for Ctx<C> {
111    fn ctx(&mut self) -> &mut WasiCtx {
112        &mut self.wasi
113    }
114    fn table(&mut self) -> &mut ResourceTable {
115        &mut self.table
116    }
117}
118
119#[instrument(level = "trace", skip(adapter, cx))]
120async fn instantiate_pre<C>(
121    adapter: &[u8],
122    cx: C::Context,
123    workload: &str,
124) -> anyhow::Result<(InstancePre<Ctx<C>>, Engine, Arc<[ResourceType]>)>
125where
126    C: Invoke,
127    C::Context: Clone + 'static,
128{
129    let engine = Engine::new(
130        wasmtime::Config::new()
131            .async_support(true)
132            .wasm_component_model(true),
133    )
134    .context("failed to initialize Wasmtime engine")?;
135
136    let wasm = if workload.starts_with('.') {
137        fs::read(&workload)
138            .await
139            .with_context(|| format!("failed to read relative path to workload `{workload}`"))
140            .map(Workload::Binary)
141    } else {
142        Url::parse(workload)
143            .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
144            .map(Workload::Url)
145    }?;
146    let wasm = match wasm {
147        Workload::Url(wasm) => match wasm.scheme() {
148            "file" => {
149                let wasm = wasm
150                    .to_file_path()
151                    .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
152                fs::read(wasm)
153                    .await
154                    .context("failed to read Wasm from file URL")?
155            }
156            "http" | "https" => {
157                let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
158                let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
159                wasm.to_vec()
160            }
161            scheme => bail!("URL scheme `{scheme}` not supported"),
162        },
163        Workload::Binary(wasm) => wasm,
164    };
165    let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
166        wit_component::ComponentEncoder::default()
167            .validate(true)
168            .module(&wasm)
169            .context("failed to set core component module")?
170            .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
171            .context("failed to add WASI adapter")?
172            .encode()
173            .context("failed to encode a component")?
174    } else {
175        wasm
176    };
177
178    let component = Component::new(&engine, wasm).context("failed to compile component")?;
179
180    let mut linker = Linker::<Ctx<C>>::new(&engine);
181    wasmtime_wasi::add_to_linker_async(&mut linker).context("failed to link WASI")?;
182
183    let ty = component.component_type();
184    let mut resources = Vec::new();
185    collect_component_resources(&engine, &ty, &mut resources);
186    let resources = Arc::from(resources);
187    for (name, item) in ty.imports(&engine) {
188        // Avoid polyfilling instances, for which static bindings are linked
189        match name {
190            "wasi:cli/environment@0.2.0"
191            | "wasi:cli/exit@0.2.0"
192            | "wasi:cli/stderr@0.2.0"
193            | "wasi:cli/stdin@0.2.0"
194            | "wasi:cli/stdout@0.2.0"
195            | "wasi:cli/terminal-input@0.2.0"
196            | "wasi:cli/terminal-output@0.2.0"
197            | "wasi:cli/terminal-stderr@0.2.0"
198            | "wasi:cli/terminal-stdin@0.2.0"
199            | "wasi:cli/terminal-stdout@0.2.0"
200            | "wasi:clocks/monotonic-clock@0.2.0"
201            | "wasi:clocks/wall-clock@0.2.0"
202            | "wasi:filesystem/preopens@0.2.0"
203            | "wasi:filesystem/types@0.2.0"
204            | "wasi:http/incoming-handler@0.2.0"
205            | "wasi:http/outgoing-handler@0.2.0"
206            | "wasi:http/types@0.2.0"
207            | "wasi:io/error@0.2.0"
208            | "wasi:io/poll@0.2.0"
209            | "wasi:io/streams@0.2.0"
210            | "wasi:random/random@0.2.0"
211            | "wasi:sockets/instance-network@0.2.0"
212            | "wasi:sockets/network@0.2.0"
213            | "wasi:sockets/tcp-create-socket@0.2.0"
214            | "wasi:sockets/tcp@0.2.0"
215            | "wasi:sockets/udp-create-socket@0.2.0"
216            | "wasi:sockets/udp@0.2.0" => continue,
217            _ => {}
218        }
219        if let Err(err) = link_item(
220            &engine,
221            &mut linker.root(),
222            Arc::clone(&resources),
223            item,
224            "",
225            name,
226            cx.clone(),
227        ) {
228            error!(?err, "failed to polyfill instance");
229        }
230    }
231
232    let pre = linker
233        .instantiate_pre(&component)
234        .context("failed to pre-instantiate component")?;
235    Ok((pre, engine, resources))
236}
237
238fn new_store<C: Invoke>(
239    engine: &Engine,
240    wrpc: C,
241    arg0: &str,
242    timeout: Duration,
243) -> wasmtime::Store<Ctx<C>> {
244    Store::new(
245        engine,
246        Ctx {
247            wasi: WasiCtxBuilder::new()
248                .inherit_env()
249                .inherit_stdio()
250                .inherit_network()
251                .args(&[arg0])
252                .build(),
253            table: ResourceTable::new(),
254            shared_resources: SharedResourceTable::default(),
255            wrpc,
256            timeout,
257        },
258    )
259}
260
261#[instrument(level = "trace", ret)]
262pub async fn handle_run(
263    RunArgs {
264        nats,
265        timeout,
266        target,
267        ref workload,
268    }: RunArgs,
269) -> anyhow::Result<()> {
270    let nats = wrpc_cli::nats::connect(nats)
271        .await
272        .context("failed to connect to NATS")?;
273
274    let (pre, engine, _) =
275        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, None, workload).await?;
276    let mut store = new_store(
277        &engine,
278        wrpc_transport_nats::Client::new(nats, target, None),
279        "command.wasm",
280        *timeout,
281    );
282    let cmd = wasmtime_wasi::bindings::CommandPre::new(pre)
283        .context("failed to construct `command` instance")?
284        .instantiate_async(&mut store)
285        .await
286        .context("failed to instantiate `command`")?;
287    cmd.wasi_cli_run()
288        .call_run(&mut store)
289        .await
290        .context("failed to run component")?
291        .map_err(|()| anyhow!("component failed"))
292}
293
294#[instrument(level = "trace", skip_all, ret)]
295pub async fn serve_shared(
296    handlers: &mut JoinSet<()>,
297    srv: wrpc_transport_nats::Client,
298    mut store: wasmtime::Store<Ctx<wrpc_transport_nats::Client>>,
299    pre: InstancePre<Ctx<wrpc_transport_nats::Client>>,
300    guest_resources: Arc<[ResourceType]>,
301) -> anyhow::Result<()> {
302    let span = Span::current();
303    let instance = pre
304        .instantiate_async(&mut store)
305        .await
306        .context("failed to instantiate component")?;
307    let engine = store.engine().clone();
308    let store = Arc::new(Mutex::new(store));
309    for (name, ty) in pre.component().component_type().exports(&engine) {
310        match (name, ty) {
311            (name, types::ComponentItem::ComponentFunc(ty)) => {
312                info!(?name, "serving root function");
313                let invocations = srv
314                    .serve_function_shared(
315                        Arc::clone(&store),
316                        instance,
317                        Arc::clone(&guest_resources),
318                        ty,
319                        "",
320                        name,
321                    )
322                    .await?;
323                handlers.spawn(
324                    async move {
325                        let mut invocations = pin!(invocations);
326                        while let Some(invocation) = invocations.next().await {
327                            match invocation {
328                                Ok((headers, fut)) => {
329                                    info!(?headers, "serving root function invocation");
330                                    if let Err(err) = fut.await {
331                                        warn!(
332                                            ?headers,
333                                            ?err,
334                                            "failed to serve root function invocation"
335                                        );
336                                    } else {
337                                        info!("successfully served root function invocation");
338                                    }
339                                }
340                                Err(err) => {
341                                    error!(?err, "failed to accept root function invocation");
342                                }
343                            }
344                        }
345                    }
346                    .instrument(span.clone()),
347                );
348            }
349            (_, types::ComponentItem::CoreFunc(_)) => {
350                warn!(name, "serving root core function exports not supported yet");
351            }
352            (_, types::ComponentItem::Module(_)) => {
353                warn!(name, "serving root module exports not supported yet");
354            }
355            (_, types::ComponentItem::Component(_)) => {
356                warn!(name, "serving root component exports not supported yet");
357            }
358            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
359                for (name, ty) in ty.exports(&engine) {
360                    match ty {
361                        types::ComponentItem::ComponentFunc(ty) => {
362                            info!(?name, "serving instance function");
363                            let invocations = srv
364                                .serve_function_shared(
365                                    Arc::clone(&store),
366                                    instance,
367                                    Arc::clone(&guest_resources),
368                                    ty,
369                                    instance_name,
370                                    name,
371                                )
372                                .await?;
373                            handlers.spawn(async move {
374                                let mut invocations = pin!(invocations);
375                                while let Some(invocation) = invocations.next().await {
376                                    match invocation {
377                                        Ok((headers, fut)) => {
378                                            info!(?headers, "serving instance function invocation");
379                                            if let Err(err) = fut.await {
380                                                warn!(
381                                                    ?headers,
382                                                    ?err,
383                                                    "failed to serve instance function invocation"
384                                                );
385                                            } else {
386                                                info!(
387                                                    "successfully served instance function invocation"
388                                                );
389                                            }
390                                        }
391                                        Err(err) => {
392                                            error!(
393                                                ?err,
394                                                "failed to accept instance function invocation"
395                                            );
396                                        }
397                                    }
398                                }
399                            }
400                            .instrument(span.clone()));
401                        }
402                        types::ComponentItem::CoreFunc(_) => {
403                            warn!(
404                                instance_name,
405                                name, "serving instance core function exports not supported yet"
406                            );
407                        }
408                        types::ComponentItem::Module(_) => {
409                            warn!(
410                                instance_name,
411                                name, "serving instance module exports not supported yet"
412                            );
413                        }
414                        types::ComponentItem::Component(_) => {
415                            warn!(
416                                instance_name,
417                                name, "serving instance component exports not supported yet"
418                            );
419                        }
420                        types::ComponentItem::ComponentInstance(_) => {
421                            warn!(
422                                instance_name,
423                                name, "serving nested instance exports not supported yet"
424                            );
425                        }
426                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
427                    }
428                }
429            }
430            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
431        }
432    }
433    Ok(())
434}
435
436#[instrument(level = "trace", skip_all, ret)]
437pub async fn serve_stateless(
438    handlers: &mut JoinSet<()>,
439    srv: wrpc_transport_nats::Client,
440    clt: wrpc_transport_nats::Client,
441    pre: InstancePre<Ctx<wrpc_transport_nats::Client>>,
442    engine: &Engine,
443    timeout: Duration,
444) -> anyhow::Result<()> {
445    let span = Span::current();
446    for (name, ty) in pre.component().component_type().exports(engine) {
447        match (name, ty) {
448            (name, types::ComponentItem::ComponentFunc(ty)) => {
449                let clt = clt.clone();
450                let engine = engine.clone();
451                info!(?name, "serving root function");
452                let invocations = srv
453                    .serve_function(
454                        move || new_store(&engine, clt.clone(), "reactor.wasm", timeout),
455                        pre.clone(),
456                        ty,
457                        "",
458                        name,
459                    )
460                    .await?;
461                handlers.spawn(
462                    async move {
463                        let mut invocations = pin!(invocations);
464                        while let Some(invocation) = invocations.next().await {
465                            match invocation {
466                                Ok((headers, fut)) => {
467                                    info!(?headers, "serving root function invocation");
468                                    if let Err(err) = fut.await {
469                                        warn!(
470                                            ?headers,
471                                            ?err,
472                                            "failed to serve root function invocation"
473                                        );
474                                    } else {
475                                        info!("successfully served root function invocation");
476                                    }
477                                }
478                                Err(err) => {
479                                    error!(?err, "failed to accept root function invocation");
480                                }
481                            }
482                        }
483                    }
484                    .instrument(span.clone()),
485                );
486            }
487            (_, types::ComponentItem::CoreFunc(_)) => {
488                warn!(name, "serving root core function exports not supported yet");
489            }
490            (_, types::ComponentItem::Module(_)) => {
491                warn!(name, "serving root module exports not supported yet");
492            }
493            (_, types::ComponentItem::Component(_)) => {
494                warn!(name, "serving root component exports not supported yet");
495            }
496            (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
497                for (name, ty) in ty.exports(engine) {
498                    match ty {
499                        types::ComponentItem::ComponentFunc(ty) => {
500                            let clt = clt.clone();
501                            let engine = engine.clone();
502                            info!(?name, "serving instance function");
503                            let invocations = srv
504                                .serve_function(
505                                    move || {
506                                        new_store(&engine, clt.clone(), "reactor.wasm", timeout)
507                                    },
508                                    pre.clone(),
509                                    ty,
510                                    instance_name,
511                                    name,
512                                )
513                                .await?;
514                            handlers.spawn(async move {
515                                let mut invocations = pin!(invocations);
516                                while let Some(invocation) = invocations.next().await {
517                                    match invocation {
518                                        Ok((headers, fut)) => {
519                                            info!(?headers, "serving instance function invocation");
520                                            if let Err(err) = fut.await {
521                                                warn!(
522                                                    ?headers,
523                                                    ?err,
524                                                    "failed to serve instance function invocation"
525                                                );
526                                            } else {
527                                                info!(
528                                                    "successfully served instance function invocation"
529                                                );
530                                            }
531                                        }
532                                        Err(err) => {
533                                            error!(
534                                                ?err,
535                                                "failed to accept instance function invocation"
536                                            );
537                                        }
538                                    }
539                                }
540                            }.instrument(span.clone()));
541                        }
542                        types::ComponentItem::CoreFunc(_) => {
543                            warn!(
544                                instance_name,
545                                name, "serving instance core function exports not supported yet"
546                            );
547                        }
548                        types::ComponentItem::Module(_) => {
549                            warn!(
550                                instance_name,
551                                name, "serving instance module exports not supported yet"
552                            );
553                        }
554                        types::ComponentItem::Component(_) => {
555                            warn!(
556                                instance_name,
557                                name, "serving instance component exports not supported yet"
558                            );
559                        }
560                        types::ComponentItem::ComponentInstance(_) => {
561                            warn!(
562                                instance_name,
563                                name, "serving nested instance exports not supported yet"
564                            );
565                        }
566                        types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
567                    }
568                }
569            }
570            (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
571        }
572    }
573    Ok(())
574}
575
576#[instrument(level = "trace", ret)]
577pub async fn handle_serve(
578    ServeArgs {
579        nats,
580        timeout,
581        prefix,
582        target,
583        group,
584        ref workload,
585    }: ServeArgs,
586) -> anyhow::Result<()> {
587    let nats = wrpc_cli::nats::connect(nats)
588        .await
589        .context("failed to connect to NATS")?;
590    let nats = Arc::new(nats);
591
592    let (pre, engine, guest_resources) =
593        instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, None, workload).await?;
594
595    let clt = wrpc_transport_nats::Client::new(Arc::clone(&nats), target, None);
596    let srv = wrpc_transport_nats::Client::new(nats, prefix, group.map(Arc::from));
597    let mut handlers = JoinSet::new();
598    if guest_resources.is_empty() {
599        serve_stateless(&mut handlers, srv, clt, pre, &engine, *timeout).await?;
600    } else {
601        serve_shared(
602            &mut handlers,
603            srv,
604            new_store(&engine, clt, "reactor.wasm", *timeout),
605            pre,
606            guest_resources,
607        )
608        .await?;
609    }
610    while let Some(res) = handlers.join_next().await {
611        if let Err(err) = res {
612            error!(?err, "handler failed");
613        }
614    }
615    Ok(())
616}
617
618#[instrument(level = "trace", ret)]
619pub async fn run() -> anyhow::Result<()> {
620    wrpc_cli::tracing::init();
621    match Command::parse() {
622        Command::Run(args) => handle_run(args).await,
623        Command::Serve(args) => handle_serve(args).await,
624    }
625}