1#![allow(clippy::type_complexity)]
2
3use core::pin::pin;
4use core::time::Duration;
5
6use std::sync::Arc;
7
8use anyhow::{anyhow, bail, Context as _};
9use clap::Parser;
10use futures::StreamExt as _;
11use tokio::fs;
12use tokio::sync::Mutex;
13use tokio::task::JoinSet;
14use tracing::{error, info, instrument, warn, Instrument as _, Span};
15use url::Url;
16use wasi_preview1_component_adapter_provider::{
17 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER,
18 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
19};
20use wasmtime::component::{types, Component, InstancePre, Linker, ResourceType};
21use wasmtime::{Engine, Store};
22use wasmtime_wasi::{ResourceTable, WasiCtxBuilder};
23use wasmtime_wasi::{WasiCtx, WasiView};
24use wrpc_runtime_wasmtime::{
25 collect_component_resources, link_item, ServeExt as _, SharedResourceTable, WrpcView,
26};
27use wrpc_transport::Invoke;
28
29const DEFAULT_TIMEOUT: &str = "10s";
30
31#[derive(Parser, Debug)]
32#[command(author, version, about, long_about = None)]
33enum Command {
34 Run(RunArgs),
35 Serve(ServeArgs),
36}
37
38#[derive(Parser, Debug)]
40struct RunArgs {
41 #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)]
43 nats: String,
44
45 #[arg(long, default_value = DEFAULT_TIMEOUT)]
47 timeout: humantime::Duration,
48
49 target: String,
51
52 workload: String,
54}
55
56#[derive(Parser, Debug)]
58struct ServeArgs {
59 #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)]
61 nats: String,
62
63 #[arg(long, default_value = DEFAULT_TIMEOUT)]
65 timeout: humantime::Duration,
66
67 #[arg(short, long)]
69 group: Option<String>,
70
71 target: String,
73
74 prefix: String,
76
77 workload: String,
79}
80
81pub enum Workload {
82 Url(Url),
83 Binary(Vec<u8>),
84}
85
86pub struct Ctx<C: Invoke> {
87 pub table: ResourceTable,
88 pub wasi: WasiCtx,
89 pub wrpc: C,
90 pub shared_resources: SharedResourceTable,
91 pub timeout: Duration,
92}
93
94impl<C: Invoke> WrpcView for Ctx<C> {
95 type Invoke = C;
96
97 fn client(&self) -> &Self::Invoke {
98 &self.wrpc
99 }
100
101 fn shared_resources(&mut self) -> &mut SharedResourceTable {
102 &mut self.shared_resources
103 }
104
105 fn timeout(&self) -> Option<Duration> {
106 Some(self.timeout)
107 }
108}
109
110impl<C: Invoke> WasiView for Ctx<C> {
111 fn ctx(&mut self) -> &mut WasiCtx {
112 &mut self.wasi
113 }
114 fn table(&mut self) -> &mut ResourceTable {
115 &mut self.table
116 }
117}
118
119#[instrument(level = "trace", skip(adapter, cx))]
120async fn instantiate_pre<C>(
121 adapter: &[u8],
122 cx: C::Context,
123 workload: &str,
124) -> anyhow::Result<(InstancePre<Ctx<C>>, Engine, Arc<[ResourceType]>)>
125where
126 C: Invoke,
127 C::Context: Clone + 'static,
128{
129 let engine = Engine::new(
130 wasmtime::Config::new()
131 .async_support(true)
132 .wasm_component_model(true),
133 )
134 .context("failed to initialize Wasmtime engine")?;
135
136 let wasm = if workload.starts_with('.') {
137 fs::read(&workload)
138 .await
139 .with_context(|| format!("failed to read relative path to workload `{workload}`"))
140 .map(Workload::Binary)
141 } else {
142 Url::parse(workload)
143 .with_context(|| format!("failed to parse Wasm URL `{workload}`"))
144 .map(Workload::Url)
145 }?;
146 let wasm = match wasm {
147 Workload::Url(wasm) => match wasm.scheme() {
148 "file" => {
149 let wasm = wasm
150 .to_file_path()
151 .map_err(|()| anyhow!("failed to convert Wasm URL to file path"))?;
152 fs::read(wasm)
153 .await
154 .context("failed to read Wasm from file URL")?
155 }
156 "http" | "https" => {
157 let wasm = reqwest::get(wasm).await.context("failed to GET Wasm URL")?;
158 let wasm = wasm.bytes().await.context("failed fetch Wasm from URL")?;
159 wasm.to_vec()
160 }
161 scheme => bail!("URL scheme `{scheme}` not supported"),
162 },
163 Workload::Binary(wasm) => wasm,
164 };
165 let wasm = if wasmparser::Parser::is_core_wasm(&wasm) {
166 wit_component::ComponentEncoder::default()
167 .validate(true)
168 .module(&wasm)
169 .context("failed to set core component module")?
170 .adapter(WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, adapter)
171 .context("failed to add WASI adapter")?
172 .encode()
173 .context("failed to encode a component")?
174 } else {
175 wasm
176 };
177
178 let component = Component::new(&engine, wasm).context("failed to compile component")?;
179
180 let mut linker = Linker::<Ctx<C>>::new(&engine);
181 wasmtime_wasi::add_to_linker_async(&mut linker).context("failed to link WASI")?;
182
183 let ty = component.component_type();
184 let mut resources = Vec::new();
185 collect_component_resources(&engine, &ty, &mut resources);
186 let resources = Arc::from(resources);
187 for (name, item) in ty.imports(&engine) {
188 match name {
190 "wasi:cli/environment@0.2.0"
191 | "wasi:cli/exit@0.2.0"
192 | "wasi:cli/stderr@0.2.0"
193 | "wasi:cli/stdin@0.2.0"
194 | "wasi:cli/stdout@0.2.0"
195 | "wasi:cli/terminal-input@0.2.0"
196 | "wasi:cli/terminal-output@0.2.0"
197 | "wasi:cli/terminal-stderr@0.2.0"
198 | "wasi:cli/terminal-stdin@0.2.0"
199 | "wasi:cli/terminal-stdout@0.2.0"
200 | "wasi:clocks/monotonic-clock@0.2.0"
201 | "wasi:clocks/wall-clock@0.2.0"
202 | "wasi:filesystem/preopens@0.2.0"
203 | "wasi:filesystem/types@0.2.0"
204 | "wasi:http/incoming-handler@0.2.0"
205 | "wasi:http/outgoing-handler@0.2.0"
206 | "wasi:http/types@0.2.0"
207 | "wasi:io/error@0.2.0"
208 | "wasi:io/poll@0.2.0"
209 | "wasi:io/streams@0.2.0"
210 | "wasi:random/random@0.2.0"
211 | "wasi:sockets/instance-network@0.2.0"
212 | "wasi:sockets/network@0.2.0"
213 | "wasi:sockets/tcp-create-socket@0.2.0"
214 | "wasi:sockets/tcp@0.2.0"
215 | "wasi:sockets/udp-create-socket@0.2.0"
216 | "wasi:sockets/udp@0.2.0" => continue,
217 _ => {}
218 }
219 if let Err(err) = link_item(
220 &engine,
221 &mut linker.root(),
222 Arc::clone(&resources),
223 item,
224 "",
225 name,
226 cx.clone(),
227 ) {
228 error!(?err, "failed to polyfill instance");
229 }
230 }
231
232 let pre = linker
233 .instantiate_pre(&component)
234 .context("failed to pre-instantiate component")?;
235 Ok((pre, engine, resources))
236}
237
238fn new_store<C: Invoke>(
239 engine: &Engine,
240 wrpc: C,
241 arg0: &str,
242 timeout: Duration,
243) -> wasmtime::Store<Ctx<C>> {
244 Store::new(
245 engine,
246 Ctx {
247 wasi: WasiCtxBuilder::new()
248 .inherit_env()
249 .inherit_stdio()
250 .inherit_network()
251 .args(&[arg0])
252 .build(),
253 table: ResourceTable::new(),
254 shared_resources: SharedResourceTable::default(),
255 wrpc,
256 timeout,
257 },
258 )
259}
260
261#[instrument(level = "trace", ret)]
262pub async fn handle_run(
263 RunArgs {
264 nats,
265 timeout,
266 target,
267 ref workload,
268 }: RunArgs,
269) -> anyhow::Result<()> {
270 let nats = wrpc_cli::nats::connect(nats)
271 .await
272 .context("failed to connect to NATS")?;
273
274 let (pre, engine, _) =
275 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_COMMAND_ADAPTER, None, workload).await?;
276 let mut store = new_store(
277 &engine,
278 wrpc_transport_nats::Client::new(nats, target, None),
279 "command.wasm",
280 *timeout,
281 );
282 let cmd = wasmtime_wasi::bindings::CommandPre::new(pre)
283 .context("failed to construct `command` instance")?
284 .instantiate_async(&mut store)
285 .await
286 .context("failed to instantiate `command`")?;
287 cmd.wasi_cli_run()
288 .call_run(&mut store)
289 .await
290 .context("failed to run component")?
291 .map_err(|()| anyhow!("component failed"))
292}
293
294#[instrument(level = "trace", skip_all, ret)]
295pub async fn serve_shared(
296 handlers: &mut JoinSet<()>,
297 srv: wrpc_transport_nats::Client,
298 mut store: wasmtime::Store<Ctx<wrpc_transport_nats::Client>>,
299 pre: InstancePre<Ctx<wrpc_transport_nats::Client>>,
300 guest_resources: Arc<[ResourceType]>,
301) -> anyhow::Result<()> {
302 let span = Span::current();
303 let instance = pre
304 .instantiate_async(&mut store)
305 .await
306 .context("failed to instantiate component")?;
307 let engine = store.engine().clone();
308 let store = Arc::new(Mutex::new(store));
309 for (name, ty) in pre.component().component_type().exports(&engine) {
310 match (name, ty) {
311 (name, types::ComponentItem::ComponentFunc(ty)) => {
312 info!(?name, "serving root function");
313 let invocations = srv
314 .serve_function_shared(
315 Arc::clone(&store),
316 instance,
317 Arc::clone(&guest_resources),
318 ty,
319 "",
320 name,
321 )
322 .await?;
323 handlers.spawn(
324 async move {
325 let mut invocations = pin!(invocations);
326 while let Some(invocation) = invocations.next().await {
327 match invocation {
328 Ok((headers, fut)) => {
329 info!(?headers, "serving root function invocation");
330 if let Err(err) = fut.await {
331 warn!(
332 ?headers,
333 ?err,
334 "failed to serve root function invocation"
335 );
336 } else {
337 info!("successfully served root function invocation");
338 }
339 }
340 Err(err) => {
341 error!(?err, "failed to accept root function invocation");
342 }
343 }
344 }
345 }
346 .instrument(span.clone()),
347 );
348 }
349 (_, types::ComponentItem::CoreFunc(_)) => {
350 warn!(name, "serving root core function exports not supported yet");
351 }
352 (_, types::ComponentItem::Module(_)) => {
353 warn!(name, "serving root module exports not supported yet");
354 }
355 (_, types::ComponentItem::Component(_)) => {
356 warn!(name, "serving root component exports not supported yet");
357 }
358 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
359 for (name, ty) in ty.exports(&engine) {
360 match ty {
361 types::ComponentItem::ComponentFunc(ty) => {
362 info!(?name, "serving instance function");
363 let invocations = srv
364 .serve_function_shared(
365 Arc::clone(&store),
366 instance,
367 Arc::clone(&guest_resources),
368 ty,
369 instance_name,
370 name,
371 )
372 .await?;
373 handlers.spawn(async move {
374 let mut invocations = pin!(invocations);
375 while let Some(invocation) = invocations.next().await {
376 match invocation {
377 Ok((headers, fut)) => {
378 info!(?headers, "serving instance function invocation");
379 if let Err(err) = fut.await {
380 warn!(
381 ?headers,
382 ?err,
383 "failed to serve instance function invocation"
384 );
385 } else {
386 info!(
387 "successfully served instance function invocation"
388 );
389 }
390 }
391 Err(err) => {
392 error!(
393 ?err,
394 "failed to accept instance function invocation"
395 );
396 }
397 }
398 }
399 }
400 .instrument(span.clone()));
401 }
402 types::ComponentItem::CoreFunc(_) => {
403 warn!(
404 instance_name,
405 name, "serving instance core function exports not supported yet"
406 );
407 }
408 types::ComponentItem::Module(_) => {
409 warn!(
410 instance_name,
411 name, "serving instance module exports not supported yet"
412 );
413 }
414 types::ComponentItem::Component(_) => {
415 warn!(
416 instance_name,
417 name, "serving instance component exports not supported yet"
418 );
419 }
420 types::ComponentItem::ComponentInstance(_) => {
421 warn!(
422 instance_name,
423 name, "serving nested instance exports not supported yet"
424 );
425 }
426 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
427 }
428 }
429 }
430 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
431 }
432 }
433 Ok(())
434}
435
436#[instrument(level = "trace", skip_all, ret)]
437pub async fn serve_stateless(
438 handlers: &mut JoinSet<()>,
439 srv: wrpc_transport_nats::Client,
440 clt: wrpc_transport_nats::Client,
441 pre: InstancePre<Ctx<wrpc_transport_nats::Client>>,
442 engine: &Engine,
443 timeout: Duration,
444) -> anyhow::Result<()> {
445 let span = Span::current();
446 for (name, ty) in pre.component().component_type().exports(engine) {
447 match (name, ty) {
448 (name, types::ComponentItem::ComponentFunc(ty)) => {
449 let clt = clt.clone();
450 let engine = engine.clone();
451 info!(?name, "serving root function");
452 let invocations = srv
453 .serve_function(
454 move || new_store(&engine, clt.clone(), "reactor.wasm", timeout),
455 pre.clone(),
456 ty,
457 "",
458 name,
459 )
460 .await?;
461 handlers.spawn(
462 async move {
463 let mut invocations = pin!(invocations);
464 while let Some(invocation) = invocations.next().await {
465 match invocation {
466 Ok((headers, fut)) => {
467 info!(?headers, "serving root function invocation");
468 if let Err(err) = fut.await {
469 warn!(
470 ?headers,
471 ?err,
472 "failed to serve root function invocation"
473 );
474 } else {
475 info!("successfully served root function invocation");
476 }
477 }
478 Err(err) => {
479 error!(?err, "failed to accept root function invocation");
480 }
481 }
482 }
483 }
484 .instrument(span.clone()),
485 );
486 }
487 (_, types::ComponentItem::CoreFunc(_)) => {
488 warn!(name, "serving root core function exports not supported yet");
489 }
490 (_, types::ComponentItem::Module(_)) => {
491 warn!(name, "serving root module exports not supported yet");
492 }
493 (_, types::ComponentItem::Component(_)) => {
494 warn!(name, "serving root component exports not supported yet");
495 }
496 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
497 for (name, ty) in ty.exports(engine) {
498 match ty {
499 types::ComponentItem::ComponentFunc(ty) => {
500 let clt = clt.clone();
501 let engine = engine.clone();
502 info!(?name, "serving instance function");
503 let invocations = srv
504 .serve_function(
505 move || {
506 new_store(&engine, clt.clone(), "reactor.wasm", timeout)
507 },
508 pre.clone(),
509 ty,
510 instance_name,
511 name,
512 )
513 .await?;
514 handlers.spawn(async move {
515 let mut invocations = pin!(invocations);
516 while let Some(invocation) = invocations.next().await {
517 match invocation {
518 Ok((headers, fut)) => {
519 info!(?headers, "serving instance function invocation");
520 if let Err(err) = fut.await {
521 warn!(
522 ?headers,
523 ?err,
524 "failed to serve instance function invocation"
525 );
526 } else {
527 info!(
528 "successfully served instance function invocation"
529 );
530 }
531 }
532 Err(err) => {
533 error!(
534 ?err,
535 "failed to accept instance function invocation"
536 );
537 }
538 }
539 }
540 }.instrument(span.clone()));
541 }
542 types::ComponentItem::CoreFunc(_) => {
543 warn!(
544 instance_name,
545 name, "serving instance core function exports not supported yet"
546 );
547 }
548 types::ComponentItem::Module(_) => {
549 warn!(
550 instance_name,
551 name, "serving instance module exports not supported yet"
552 );
553 }
554 types::ComponentItem::Component(_) => {
555 warn!(
556 instance_name,
557 name, "serving instance component exports not supported yet"
558 );
559 }
560 types::ComponentItem::ComponentInstance(_) => {
561 warn!(
562 instance_name,
563 name, "serving nested instance exports not supported yet"
564 );
565 }
566 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
567 }
568 }
569 }
570 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
571 }
572 }
573 Ok(())
574}
575
576#[instrument(level = "trace", ret)]
577pub async fn handle_serve(
578 ServeArgs {
579 nats,
580 timeout,
581 prefix,
582 target,
583 group,
584 ref workload,
585 }: ServeArgs,
586) -> anyhow::Result<()> {
587 let nats = wrpc_cli::nats::connect(nats)
588 .await
589 .context("failed to connect to NATS")?;
590 let nats = Arc::new(nats);
591
592 let (pre, engine, guest_resources) =
593 instantiate_pre(WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER, None, workload).await?;
594
595 let clt = wrpc_transport_nats::Client::new(Arc::clone(&nats), target, None);
596 let srv = wrpc_transport_nats::Client::new(nats, prefix, group.map(Arc::from));
597 let mut handlers = JoinSet::new();
598 if guest_resources.is_empty() {
599 serve_stateless(&mut handlers, srv, clt, pre, &engine, *timeout).await?;
600 } else {
601 serve_shared(
602 &mut handlers,
603 srv,
604 new_store(&engine, clt, "reactor.wasm", *timeout),
605 pre,
606 guest_resources,
607 )
608 .await?;
609 }
610 while let Some(res) = handlers.join_next().await {
611 if let Err(err) = res {
612 error!(?err, "handler failed");
613 }
614 }
615 Ok(())
616}
617
618#[instrument(level = "trace", ret)]
619pub async fn run() -> anyhow::Result<()> {
620 wrpc_cli::tracing::init();
621 match Command::parse() {
622 Command::Run(args) => handle_run(args).await,
623 Command::Serve(args) => handle_serve(args).await,
624 }
625}