1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6
7use anyhow::{bail, ensure, Context, Result};
8use bytes::BytesMut;
9use clap::Args;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
13use tracing::debug;
14
15use wash_lib::cli::{validate_component_id, CommandOutput};
16use wash_lib::config::DEFAULT_LATTICE;
17use wasmcloud_core::parse_wit_meta_from_operation;
18use wit_bindgen_wrpc::wrpc_transport::InvokeExt as _;
19
20use crate::util::{default_timeout_ms, extract_arg_value, msgpack_to_json_val};
21
22const DEFAULT_HTTP_SCHEME: &str = "http";
23const DEFAULT_HTTP_HOST: &str = "localhost";
24const DEFAULT_HTTP_PORT: u16 = 8080;
26
27#[derive(Deserialize)]
28struct TestResult<'a> {
29 #[serde(default)]
31 pub name: String,
32 #[serde(default)]
34 pub passed: bool,
35 #[serde(rename = "snapData")]
39 #[serde(with = "serde_bytes")]
40 #[serde(default)]
41 pub snap_data: &'a [u8],
42}
43
44fn print_test_results(results: &[TestResult]) {
50 #[derive(Deserialize)]
52 struct ErrorReport {
53 error: String,
54 }
55
56 let mut passed = 0u32;
57 let total = results.len() as u32;
58 let mut stdout = StandardStream::stdout(ColorChoice::Always);
61 let mut green = ColorSpec::new();
62 green.set_fg(Some(Color::Green));
63 let mut red = ColorSpec::new();
64 red.set_fg(Some(Color::Red));
65 for test in results.iter() {
66 if test.passed {
67 let _ = stdout.set_color(&green);
68 write!(&mut stdout, "Pass").unwrap();
69 let _ = stdout.reset();
70 writeln!(&mut stdout, ": {}", test.name).unwrap();
71 passed += 1;
72 } else {
73 let error_msg = serde_json::from_slice::<ErrorReport>(test.snap_data)
74 .map(|r| r.error)
75 .unwrap_or_default();
76 let _ = stdout.set_color(&red);
77 write!(&mut stdout, "Fail").unwrap();
78 let _ = stdout.reset();
79 writeln!(&mut stdout, ": {}", error_msg).unwrap();
80 }
81 }
82 let status_color = if passed == total { green } else { red };
83 write!(&mut stdout, "Test results: ").unwrap();
84 let _ = stdout.set_color(&status_color);
85 writeln!(&mut stdout, "{}/{} Passed", passed, total).unwrap();
86 let _ = stdout.set_color(&ColorSpec::new());
88 writeln!(&mut stdout).unwrap();
89}
90
91#[derive(Debug, Args, Clone)]
92#[clap(name = "call")]
93pub struct CallCli {
94 #[clap(flatten)]
95 command: CallCommand,
96}
97
98impl CallCli {
99 pub fn command(self) -> CallCommand {
100 self.command
101 }
102}
103
104pub async fn handle_command(
105 CallCommand {
106 component_id,
107 function,
108 opts,
109 http_handler_invocation_opts,
110 http_response_extract_json,
111 ..
112 }: CallCommand,
113) -> Result<CommandOutput> {
114 ensure!(!component_id.is_empty(), "component ID may not be empty");
115 debug!(
116 ?component_id,
117 ?function,
118 "calling component function over wRPC"
119 );
120
121 let lattice = opts
122 .lattice
123 .clone()
124 .unwrap_or_else(|| DEFAULT_LATTICE.to_string());
125
126 let nc = create_client_from_opts_wrpc(&opts)
127 .await
128 .context("failed to create async nats client")?;
129 let wrpc_client =
130 wrpc_transport_nats::Client::new(nc, format!("{}.{component_id}", &lattice), None).await?;
131
132 let (namespace, package, interface, name) = parse_wit_meta_from_operation(&function).context(
133 "Invalid function supplied. Must be in the form of `namespace:package/interface.function`",
134 )?;
135 let instance = format!("{namespace}:{package}/{interface}");
136 let name = name.context(
137 "Invalid function supplied. Must be in the form of `namespace:package/interface.function`",
138 )?;
139 debug!(
140 ?component_id,
141 ?instance,
142 ?name,
143 ?lattice,
144 "invoking component"
145 );
146
147 match function.as_str() {
148 "wrpc:http/incoming-handler.handle" | "wasi:http/incoming-handler.handle" => {
151 let request = http_handler_invocation_opts
152 .to_request()
153 .await
154 .context("failed to invoke handler with HTTP request options")?;
155 wrpc_invoke_http_handler(
156 wrpc_client,
157 &lattice,
158 &component_id,
159 opts.timeout_ms,
160 request,
161 http_response_extract_json,
162 )
163 .await
164 }
165 _ => {
167 wrpc_invoke_simple(
168 wrpc_client,
169 &lattice,
170 &component_id,
171 &instance,
172 &name,
173 opts.timeout_ms,
174 )
175 .await
176 }
177 }
178}
179
180#[derive(Debug, Clone, Args)]
181pub struct ConnectionOpts {
182 #[clap(
184 short = 'r',
185 long = "rpc-host",
186 env = "WASMCLOUD_RPC_HOST",
187 default_value = "127.0.0.1"
188 )]
189 rpc_host: String,
190
191 #[clap(
193 short = 'p',
194 long = "rpc-port",
195 env = "WASMCLOUD_RPC_PORT",
196 default_value = "4222"
197 )]
198 rpc_port: String,
199
200 #[clap(
202 long = "rpc-jwt",
203 env = "WASMCLOUD_RPC_JWT",
204 hide_env_values = true,
205 requires = "rpc_seed"
206 )]
207 rpc_jwt: Option<String>,
208
209 #[clap(
211 long = "rpc-seed",
212 env = "WASMCLOUD_RPC_SEED",
213 hide_env_values = true,
214 requires = "rpc_jwt"
215 )]
216 rpc_seed: Option<String>,
217
218 #[clap(long = "rpc-credsfile", env = "WASH_RPC_CREDS", hide_env_values = true)]
221 rpc_credsfile: Option<PathBuf>,
222
223 #[clap(
226 long = "rpc-ca-file",
227 env = "WASH_RPC_TLS_CA_FILE",
228 hide_env_values = true
229 )]
230 rpc_ca_file: Option<PathBuf>,
231
232 #[clap(short = 'x', long = "lattice", env = "WASMCLOUD_LATTICE")]
234 lattice: Option<String>,
235
236 #[clap(
238 short = 't',
239 long = "rpc-timeout-ms",
240 default_value_t = default_timeout_ms(),
241 env = "WASMCLOUD_RPC_TIMEOUT_MS"
242 )]
243 timeout_ms: u64,
244
245 #[clap(long = "context")]
247 pub context: Option<String>,
248}
249
250#[derive(Args, Debug, Clone)]
251pub struct CallCommand {
252 #[clap(flatten)]
253 opts: ConnectionOpts,
254
255 #[clap(name = "component-id", value_parser = validate_component_id)]
257 pub component_id: String,
258
259 #[clap(name = "function")]
261 pub function: String,
262
263 #[clap(
265 long = "http-response-extract-json",
266 default_value_t = false,
267 env = "WASH_CALL_HTTP_RESPONSE_EXTRACT_JSON"
268 )]
269 pub http_response_extract_json: bool,
270
271 #[clap(flatten)]
273 pub http_handler_invocation_opts: HttpHandlerInvocationOpts,
274}
275
276#[derive(Debug, Clone, Deserialize, Args)]
278pub struct HttpHandlerInvocationOpts {
279 #[clap(long = "http-scheme", env = "WASH_CALL_INVOKE_HTTP_SCHEME")]
281 http_scheme: Option<String>,
282
283 #[clap(long = "http-host", env = "WASH_CALL_INVOKE_HTTP_HOST")]
285 http_host: Option<String>,
286
287 #[clap(long = "http-port", env = "WASH_CALL_INVOKE_HTTP_PORT")]
289 http_port: Option<u16>,
290
291 #[clap(long = "http-method", env = "WASH_CALL_INVOKE_HTTP_METHOD")]
293 http_method: Option<String>,
294
295 #[clap(
297 long = "http-body",
298 env = "WASH_CALL_INVOKE_HTTP_BODY",
299 conflicts_with = "http_body_path"
300 )]
301 http_body: Option<String>,
302
303 #[clap(
305 long = "http-body-path",
306 env = "WASH_CALL_INVOKE_HTTP_BODY_PATH",
307 conflicts_with = "http_body"
308 )]
309 http_body_path: Option<PathBuf>,
310
311 #[clap(long = "http-content-type", env = "WASH_CALL_INVOKE_HTTP_CONTENT_TYPE")]
313 http_content_type: Option<String>,
314}
315
316impl HttpHandlerInvocationOpts {
317 pub async fn to_request(self) -> Result<http::Request<String>> {
318 let HttpHandlerInvocationOpts {
319 http_scheme,
320 http_host,
321 http_port,
322 http_method,
323 http_body,
324 http_body_path,
325 http_content_type,
326 ..
327 } = self;
328
329 let host = http_host.unwrap_or_else(|| DEFAULT_HTTP_HOST.into());
330 let port = http_port.unwrap_or(DEFAULT_HTTP_PORT);
331 let scheme = http_scheme.unwrap_or_else(|| DEFAULT_HTTP_SCHEME.into());
332 let method =
333 http::method::Method::from_str(http_method.unwrap_or_else(|| "GET".into()).as_str())
334 .context("failed to read method from input")?;
335 debug!(?host, ?port, ?scheme, ?method, content_type = ?http_content_type, "building request from options");
336
337 let http_body = match (http_body, http_body_path) {
338 (Some(s), _) => s,
339 (_, Some(p)) => tokio::fs::read_to_string(p)
340 .await
341 .context("failed to read http body file")?,
342 (None, None) => String::new(),
343 };
344
345 let mut req = http::Request::builder()
347 .uri(format!("{scheme}://{host}:{port}"))
348 .method(method);
349 if let Some(content_type) = http_content_type {
350 req = req.header("Content-Type", content_type);
351 }
352 req.body(http_body)
353 .context("failed to build HTTP request from handler invocation options")
354 }
355}
356
357#[derive(Debug, Clone, Serialize)]
359struct HttpResponse {
360 status: u16,
361 headers: HashMap<String, String>,
362 body: String,
363}
364
365async fn wrpc_invoke_http_handler(
367 client: wrpc_transport_nats::Client,
368 lattice: &str,
369 component_id: &str,
370 timeout_ms: u64,
371 request: http::request::Request<String>,
372 extract_json: bool,
373) -> Result<CommandOutput> {
374 use futures::StreamExt;
375 use wrpc_interface_http::InvokeIncomingHandler as _;
376
377 let result = tokio::time::timeout(
378 std::time::Duration::from_millis(timeout_ms),
379 client
380 .invoke_handle_http(Some(gen_wash_call_headers()), request)
381 )
382 .await
383 .with_context(|| format!("component invocation timeout, is component [{component_id}] running in lattice [{lattice}]?"))?
384 .context("failed to perform HTTP request")?;
385
386 match result {
387 (Ok(mut resp), _errs, io) => {
388 if let Some(io) = io {
389 io.await.context("failed to complete async I/O")?;
390 }
391
392 let status = resp.status().as_u16();
393 let headers =
394 HashMap::<String, String>::from_iter(resp.headers().into_iter().map(|(k, v)| {
395 (
396 k.as_str().into(),
397 v.to_str().map(|v| v.to_string()).unwrap_or_default(),
398 )
399 }));
400
401 let mut body = BytesMut::new();
403 while let Some(bytes) = resp.body_mut().body.next().await {
404 body.extend(bytes);
405 }
406 let body = body.freeze();
407
408 let output = if extract_json {
411 let body_json = serde_json::from_slice(&body)
412 .context("failed to parse response body bytes into a valid JSON object")?;
413 CommandOutput::new(
414 serde_json::to_string_pretty(&body_json)
415 .context("failed to print http response JSON")?,
416 HashMap::from([("response".into(), body_json)]),
417 )
418 } else {
419 let http_resp = HttpResponse {
420 status,
421 headers,
422 body: String::from_utf8(Vec::from(body))
423 .context("failed to parse returned bytes as string")?,
424 };
425 CommandOutput::new(
426 serde_json::to_string_pretty(&http_resp)
427 .context("failed to print http response JSON")?,
428 HashMap::from([(
429 "response".into(),
430 serde_json::to_value(&http_resp)
431 .context("failed to convert http response to value")?,
432 )]),
433 )
434 };
435
436 Ok(output)
437 }
438 _ => bail!("unexpected response after HTTP wRPC invocation"),
440 }
441}
442
443async fn wrpc_invoke_simple(
445 client: wrpc_transport_nats::Client,
446 lattice: &str,
447 component_id: &str,
448 instance: &str,
449 function_name: &str,
450 timeout_ms: u64,
451) -> Result<CommandOutput> {
452 let result = client
453 .timeout(Duration::from_millis(timeout_ms))
454 .invoke_values_blocking::<_, ((),), (String,)>(
455 Some(gen_wash_call_headers()),
456 instance,
457 function_name,
458 ((),),
459 &[[]; 0],
460 )
461 .await
462 .with_context(|| format!("timed out invoking component, is component [{component_id}] running in lattice [{lattice}]?"));
463
464 match result {
465 Ok((result,)) => {
466 Ok(CommandOutput::new(result.clone(), HashMap::from([("result".to_string(), json!(result))])))
467 }
468 Err(e) if e.to_string().contains("transmission failed") => bail!("No component responded to your request, ensure component {component_id} is running in lattice {lattice}"),
469 Err(e) => bail!("Error invoking component: {e}"),
470 }
471}
472
473pub fn call_output(
475 response: Vec<u8>,
476 save_output: Option<PathBuf>,
477 bin: char,
478 is_test: bool,
479) -> Result<CommandOutput> {
480 if let Some(ref save_path) = save_output {
481 std::fs::write(save_path, response)
482 .with_context(|| format!("Error saving results to {}", &save_path.display()))?;
483
484 return Ok(CommandOutput::new(
485 "",
486 HashMap::<String, serde_json::Value>::new(),
487 ));
488 }
489
490 if is_test {
491 let test_results: Vec<TestResult> =
493 rmp_serde::from_slice(&response).with_context(|| {
494 format!(
495 "Error interpreting response as TestResults. Response: {}",
496 String::from_utf8_lossy(&response)
497 )
498 })?;
499
500 print_test_results(&test_results);
501 return Ok(CommandOutput::new(
502 "",
503 HashMap::<String, serde_json::Value>::new(),
504 ));
505 }
506
507 let json = HashMap::from([
508 (
509 "response".to_string(),
510 msgpack_to_json_val(response.clone(), bin),
511 ),
512 ("success".to_string(), serde_json::json!(true)),
513 ]);
514
515 Ok(CommandOutput::new(
516 format!(
517 "\nCall response (raw): {}",
518 String::from_utf8_lossy(&response)
519 ),
520 json,
521 ))
522}
523
524async fn create_client_from_opts_wrpc(opts: &ConnectionOpts) -> Result<async_nats::Client> {
529 let ConnectionOpts {
530 rpc_host: host,
531 rpc_port: port,
532 rpc_jwt: jwt,
533 rpc_seed: seed,
534 rpc_credsfile: credsfile,
535 rpc_ca_file: tls_ca_file,
536 ..
537 } = opts;
538
539 let nats_url = format!("{host}:{port}");
540 use async_nats::ConnectOptions;
541
542 let nc = if let Some(jwt_file) = jwt {
543 let jwt_contents = extract_arg_value(jwt_file)
544 .with_context(|| format!("Failed to extract jwt contents from {}", &jwt_file))?;
545 let kp = std::sync::Arc::new(if let Some(seed) = seed {
546 nkeys::KeyPair::from_seed(
547 &extract_arg_value(seed)
548 .with_context(|| format!("Failed to extract seed value {}", &seed))?,
549 )
550 .with_context(|| format!("Failed to create keypair from seed value {}", &seed))?
551 } else {
552 nkeys::KeyPair::new_user()
553 });
554
555 let mut opts = async_nats::ConnectOptions::with_jwt(jwt_contents, move |nonce| {
557 let key_pair = kp.clone();
558 async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
559 });
560
561 if let Some(ref ca_file) = tls_ca_file {
562 opts = opts
563 .add_root_certificates(ca_file.clone())
564 .require_tls(true);
565 }
566
567 opts.connect(&nats_url).await.with_context(|| {
568 format!(
569 "Failed to connect to NATS server {}:{} while creating client",
570 &host, &port
571 )
572 })?
573 } else if let Some(credsfile_path) = credsfile {
574 let mut opts = ConnectOptions::with_credentials_file(credsfile_path.clone())
575 .await
576 .with_context(|| {
577 format!(
578 "Failed to authenticate to NATS with credentials file {:?}",
579 &credsfile_path
580 )
581 })?;
582
583 if let Some(ca_file) = tls_ca_file {
584 opts = opts
585 .add_root_certificates(ca_file.clone())
586 .require_tls(true);
587 }
588
589 opts.connect(&nats_url).await.with_context(|| {
590 format!(
591 "Failed to connect to NATS {} with credentials file {:?}",
592 &nats_url, &credsfile_path
593 )
594 })?
595 } else {
596 let mut opts = ConnectOptions::new();
597
598 if let Some(ca_file) = tls_ca_file {
599 opts = opts
600 .add_root_certificates(ca_file.clone())
601 .require_tls(true);
602 }
603
604 opts.connect(&nats_url)
605 .await
606 .with_context(|| format!("Failed to connect to NATS {}", &nats_url))?
607 };
608 Ok(nc)
609}
610
611fn gen_wash_call_headers() -> async_nats::HeaderMap {
612 let mut headers = async_nats::HeaderMap::new();
613 headers.insert("source-id", "wash");
614 headers
615}
616
617#[cfg(test)]
618mod test {
619 use super::CallCommand;
620 use anyhow::Result;
621 use clap::Parser;
622
623 const RPC_HOST: &str = "127.0.0.1";
624 const RPC_PORT: &str = "4222";
625 const DEFAULT_LATTICE: &str = "default";
626
627 const COMPONENT_ID: &str = "MDPDJEYIAK6MACO67PRFGOSSLODBISK4SCEYDY3HEOY4P5CVJN6UCWUK";
628
629 #[derive(Debug, Parser)]
630 struct Cmd {
631 #[clap(flatten)]
632 command: CallCommand,
633 }
634
635 #[test]
636 fn test_rpc_comprehensive() -> Result<()> {
637 let call_all: Cmd = Parser::try_parse_from([
638 "call",
639 "--context",
640 "some-context",
641 "--lattice",
642 DEFAULT_LATTICE,
643 "--rpc-host",
644 RPC_HOST,
645 "--rpc-port",
646 RPC_PORT,
647 "--rpc-timeout-ms",
648 "0",
649 COMPONENT_ID,
650 "wasmcloud:test/handle.operation",
651 ])?;
652 match call_all.command {
653 CallCommand {
654 opts,
655 component_id,
656 function,
657 ..
658 } => {
659 assert_eq!(&opts.rpc_host, RPC_HOST);
660 assert_eq!(&opts.rpc_port, RPC_PORT);
661 assert_eq!(&opts.lattice.unwrap(), DEFAULT_LATTICE);
662 assert_eq!(opts.timeout_ms, 0);
663 assert_eq!(opts.context, Some("some-context".to_string()));
664 assert_eq!(component_id, COMPONENT_ID);
665 assert_eq!(function, "wasmcloud:test/handle.operation");
666 }
667 #[allow(unreachable_patterns)]
668 cmd => panic!("call constructed incorrect command: {cmd:?}"),
669 }
670 Ok(())
671 }
672}