wash_cli/
call.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6
7use anyhow::{bail, ensure, Context, Result};
8use bytes::BytesMut;
9use clap::Args;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
13use tracing::debug;
14
15use wash_lib::cli::{validate_component_id, CommandOutput};
16use wash_lib::config::DEFAULT_LATTICE;
17use wasmcloud_core::parse_wit_meta_from_operation;
18use wit_bindgen_wrpc::wrpc_transport::InvokeExt as _;
19
20use crate::util::{default_timeout_ms, extract_arg_value, msgpack_to_json_val};
21
22const DEFAULT_HTTP_SCHEME: &str = "http";
23const DEFAULT_HTTP_HOST: &str = "localhost";
24/// Default port used by wasmCloud HTTP server provider
25const DEFAULT_HTTP_PORT: u16 = 8080;
26
27#[derive(Deserialize)]
28struct TestResult<'a> {
29    /// test case name
30    #[serde(default)]
31    pub name: String,
32    /// true if the test case passed
33    #[serde(default)]
34    pub passed: bool,
35    /// (optional) more detailed results, if available.
36    /// data is snap-compressed json
37    /// failed tests should have a firsts-level key called "error".
38    #[serde(rename = "snapData")]
39    #[serde(with = "serde_bytes")]
40    #[serde(default)]
41    pub snap_data: &'a [u8],
42}
43
44/// Prints test results (with handy color!) to the terminal
45// NOTE(thomastaylor312): We are unwrapping all writing IO errors (which matches the behavior in the
46// println! macro) and swallowing the color change errors as there isn't much we can do if they fail
47// (and a color change isn't the end of the world). We may want to update this function in the
48// future to return an io::Result
49fn print_test_results(results: &[TestResult]) {
50    // structure for deserializing error results
51    #[derive(Deserialize)]
52    struct ErrorReport {
53        error: String,
54    }
55
56    let mut passed = 0u32;
57    let total = results.len() as u32;
58    // TODO(thomastaylor312): We can probably improve this a bit by using the `atty` crate to choose
59    // whether or not to colorize the text
60    let mut stdout = StandardStream::stdout(ColorChoice::Always);
61    let mut green = ColorSpec::new();
62    green.set_fg(Some(Color::Green));
63    let mut red = ColorSpec::new();
64    red.set_fg(Some(Color::Red));
65    for test in results.iter() {
66        if test.passed {
67            let _ = stdout.set_color(&green);
68            write!(&mut stdout, "Pass").unwrap();
69            let _ = stdout.reset();
70            writeln!(&mut stdout, ": {}", test.name).unwrap();
71            passed += 1;
72        } else {
73            let error_msg = serde_json::from_slice::<ErrorReport>(test.snap_data)
74                .map(|r| r.error)
75                .unwrap_or_default();
76            let _ = stdout.set_color(&red);
77            write!(&mut stdout, "Fail").unwrap();
78            let _ = stdout.reset();
79            writeln!(&mut stdout, ": {}", error_msg).unwrap();
80        }
81    }
82    let status_color = if passed == total { green } else { red };
83    write!(&mut stdout, "Test results: ").unwrap();
84    let _ = stdout.set_color(&status_color);
85    writeln!(&mut stdout, "{}/{} Passed", passed, total).unwrap();
86    // Reset the color settings back to what the user configured
87    let _ = stdout.set_color(&ColorSpec::new());
88    writeln!(&mut stdout).unwrap();
89}
90
91#[derive(Debug, Args, Clone)]
92#[clap(name = "call")]
93pub struct CallCli {
94    #[clap(flatten)]
95    command: CallCommand,
96}
97
98impl CallCli {
99    pub fn command(self) -> CallCommand {
100        self.command
101    }
102}
103
104pub async fn handle_command(
105    CallCommand {
106        component_id,
107        function,
108        opts,
109        http_handler_invocation_opts,
110        http_response_extract_json,
111        ..
112    }: CallCommand,
113) -> Result<CommandOutput> {
114    ensure!(!component_id.is_empty(), "component ID may not be empty");
115    debug!(
116        ?component_id,
117        ?function,
118        "calling component function over wRPC"
119    );
120
121    let lattice = opts
122        .lattice
123        .clone()
124        .unwrap_or_else(|| DEFAULT_LATTICE.to_string());
125
126    let nc = create_client_from_opts_wrpc(&opts)
127        .await
128        .context("failed to create async nats client")?;
129    let wrpc_client =
130        wrpc_transport_nats::Client::new(nc, format!("{}.{component_id}", &lattice), None).await?;
131
132    let (namespace, package, interface, name) = parse_wit_meta_from_operation(&function).context(
133        "Invalid function supplied. Must be in the form of `namespace:package/interface.function`",
134    )?;
135    let instance = format!("{namespace}:{package}/{interface}");
136    let name = name.context(
137        "Invalid function supplied. Must be in the form of `namespace:package/interface.function`",
138    )?;
139    debug!(
140        ?component_id,
141        ?instance,
142        ?name,
143        ?lattice,
144        "invoking component"
145    );
146
147    match function.as_str() {
148        // If we receive a HTTP call we must translate the provided data into a HTTP request that
149        // can be used with wRPC and send that over the wire
150        "wrpc:http/incoming-handler.handle" | "wasi:http/incoming-handler.handle" => {
151            let request = http_handler_invocation_opts
152                .to_request()
153                .await
154                .context("failed to invoke handler with HTTP request options")?;
155            wrpc_invoke_http_handler(
156                wrpc_client,
157                &lattice,
158                &component_id,
159                opts.timeout_ms,
160                request,
161                http_response_extract_json,
162            )
163            .await
164        }
165        // Assume the call is a function that takes no input and produces a string
166        _ => {
167            wrpc_invoke_simple(
168                wrpc_client,
169                &lattice,
170                &component_id,
171                &instance,
172                &name,
173                opts.timeout_ms,
174            )
175            .await
176        }
177    }
178}
179
180#[derive(Debug, Clone, Args)]
181pub struct ConnectionOpts {
182    /// RPC Host for connection, defaults to 127.0.0.1 for local nats
183    #[clap(
184        short = 'r',
185        long = "rpc-host",
186        env = "WASMCLOUD_RPC_HOST",
187        default_value = "127.0.0.1"
188    )]
189    rpc_host: String,
190
191    /// RPC Port for connections, defaults to 4222 for local nats
192    #[clap(
193        short = 'p',
194        long = "rpc-port",
195        env = "WASMCLOUD_RPC_PORT",
196        default_value = "4222"
197    )]
198    rpc_port: String,
199
200    /// JWT file for RPC authentication. Must be supplied with rpc_seed.
201    #[clap(
202        long = "rpc-jwt",
203        env = "WASMCLOUD_RPC_JWT",
204        hide_env_values = true,
205        requires = "rpc_seed"
206    )]
207    rpc_jwt: Option<String>,
208
209    /// Seed file or literal for RPC authentication. Must be supplied with rpc_jwt.
210    #[clap(
211        long = "rpc-seed",
212        env = "WASMCLOUD_RPC_SEED",
213        hide_env_values = true,
214        requires = "rpc_jwt"
215    )]
216    rpc_seed: Option<String>,
217
218    /// Credsfile for RPC authentication. Combines rpc_seed and rpc_jwt.
219    /// See https://docs.nats.io/using-nats/developer/connecting/creds for details.
220    #[clap(long = "rpc-credsfile", env = "WASH_RPC_CREDS", hide_env_values = true)]
221    rpc_credsfile: Option<PathBuf>,
222
223    /// CA file for RPC authentication.
224    /// See https://docs.nats.io/using-nats/developer/security/securing_nats for details.
225    #[clap(
226        long = "rpc-ca-file",
227        env = "WASH_RPC_TLS_CA_FILE",
228        hide_env_values = true
229    )]
230    rpc_ca_file: Option<PathBuf>,
231
232    /// Lattice for wasmcloud command interface, defaults to "default"
233    #[clap(short = 'x', long = "lattice", env = "WASMCLOUD_LATTICE")]
234    lattice: Option<String>,
235
236    /// Timeout length for RPC, defaults to 2000 milliseconds
237    #[clap(
238        short = 't',
239        long = "rpc-timeout-ms",
240        default_value_t = default_timeout_ms(),
241        env = "WASMCLOUD_RPC_TIMEOUT_MS"
242    )]
243    timeout_ms: u64,
244
245    /// Name of the context to use for RPC connection, authentication, and cluster seed invocation signing
246    #[clap(long = "context")]
247    pub context: Option<String>,
248}
249
250#[derive(Args, Debug, Clone)]
251pub struct CallCommand {
252    #[clap(flatten)]
253    opts: ConnectionOpts,
254
255    /// The unique component identifier of the component to invoke
256    #[clap(name = "component-id", value_parser = validate_component_id)]
257    pub component_id: String,
258
259    /// Fully qualified WIT export to invoke on the component, e.g. `wasi:cli/run.run`
260    #[clap(name = "function")]
261    pub function: String,
262
263    /// Whether the content of the HTTP response body should be parsed as JSON and returned directly
264    #[clap(
265        long = "http-response-extract-json",
266        default_value_t = false,
267        env = "WASH_CALL_HTTP_RESPONSE_EXTRACT_JSON"
268    )]
269    pub http_response_extract_json: bool,
270
271    /// Customizable options related to the HTTP handler invocation (HTTP path, method, etc)
272    #[clap(flatten)]
273    pub http_handler_invocation_opts: HttpHandlerInvocationOpts,
274}
275
276/// Options that customize the HTTP request that is fed to a HTTP handler when using `wash call`
277#[derive(Debug, Clone, Deserialize, Args)]
278pub struct HttpHandlerInvocationOpts {
279    /// Scheme to use when making the HTTP request
280    #[clap(long = "http-scheme", env = "WASH_CALL_INVOKE_HTTP_SCHEME")]
281    http_scheme: Option<String>,
282
283    /// Host to use when making the HTTP request
284    #[clap(long = "http-host", env = "WASH_CALL_INVOKE_HTTP_HOST")]
285    http_host: Option<String>,
286
287    /// Port on which to make the HTTP request
288    #[clap(long = "http-port", env = "WASH_CALL_INVOKE_HTTP_PORT")]
289    http_port: Option<u16>,
290
291    /// Method to use when making the HTTP request
292    #[clap(long = "http-method", env = "WASH_CALL_INVOKE_HTTP_METHOD")]
293    http_method: Option<String>,
294
295    /// Stringified body contents to use when making the HTTP request
296    #[clap(
297        long = "http-body",
298        env = "WASH_CALL_INVOKE_HTTP_BODY",
299        conflicts_with = "http_body_path"
300    )]
301    http_body: Option<String>,
302
303    /// Path to a file to use as the body when making a HTTP request
304    #[clap(
305        long = "http-body-path",
306        env = "WASH_CALL_INVOKE_HTTP_BODY_PATH",
307        conflicts_with = "http_body"
308    )]
309    http_body_path: Option<PathBuf>,
310
311    /// Content type header to pass with the request
312    #[clap(long = "http-content-type", env = "WASH_CALL_INVOKE_HTTP_CONTENT_TYPE")]
313    http_content_type: Option<String>,
314}
315
316impl HttpHandlerInvocationOpts {
317    pub async fn to_request(self) -> Result<http::Request<String>> {
318        let HttpHandlerInvocationOpts {
319            http_scheme,
320            http_host,
321            http_port,
322            http_method,
323            http_body,
324            http_body_path,
325            http_content_type,
326            ..
327        } = self;
328
329        let host = http_host.unwrap_or_else(|| DEFAULT_HTTP_HOST.into());
330        let port = http_port.unwrap_or(DEFAULT_HTTP_PORT);
331        let scheme = http_scheme.unwrap_or_else(|| DEFAULT_HTTP_SCHEME.into());
332        let method =
333            http::method::Method::from_str(http_method.unwrap_or_else(|| "GET".into()).as_str())
334                .context("failed to read method from input")?;
335        debug!(?host, ?port, ?scheme, ?method, content_type = ?http_content_type, "building request from options");
336
337        let http_body = match (http_body, http_body_path) {
338            (Some(s), _) => s,
339            (_, Some(p)) => tokio::fs::read_to_string(p)
340                .await
341                .context("failed to read http body file")?,
342            (None, None) => String::new(),
343        };
344
345        // Build the HTTP request
346        let mut req = http::Request::builder()
347            .uri(format!("{scheme}://{host}:{port}"))
348            .method(method);
349        if let Some(content_type) = http_content_type {
350            req = req.header("Content-Type", content_type);
351        }
352        req.body(http_body)
353            .context("failed to build HTTP request from handler invocation options")
354    }
355}
356
357/// Utility type used mostly for printing HTTP responses to the console as JSON
358#[derive(Debug, Clone, Serialize)]
359struct HttpResponse {
360    status: u16,
361    headers: HashMap<String, String>,
362    body: String,
363}
364
365/// Invoke a wRPC endpoint that takes a HTTP request (usually `wasi:http/incoming-handler.handle`);
366async fn wrpc_invoke_http_handler(
367    client: wrpc_transport_nats::Client,
368    lattice: &str,
369    component_id: &str,
370    timeout_ms: u64,
371    request: http::request::Request<String>,
372    extract_json: bool,
373) -> Result<CommandOutput> {
374    use futures::StreamExt;
375    use wrpc_interface_http::InvokeIncomingHandler as _;
376
377    let result = tokio::time::timeout(
378       std::time::Duration::from_millis(timeout_ms),
379        client
380            .invoke_handle_http(Some(gen_wash_call_headers()), request)
381  )
382   .await
383   .with_context(|| format!("component invocation timeout, is component [{component_id}] running in lattice [{lattice}]?"))?
384   .context("failed to perform HTTP request")?;
385
386    match result {
387        (Ok(mut resp), _errs, io) => {
388            if let Some(io) = io {
389                io.await.context("failed to complete async I/O")?;
390            }
391
392            let status = resp.status().as_u16();
393            let headers =
394                HashMap::<String, String>::from_iter(resp.headers().into_iter().map(|(k, v)| {
395                    (
396                        k.as_str().into(),
397                        v.to_str().map(|v| v.to_string()).unwrap_or_default(),
398                    )
399                }));
400
401            // Read the incoming body into a string
402            let mut body = BytesMut::new();
403            while let Some(bytes) = resp.body_mut().body.next().await {
404                body.extend(bytes);
405            }
406            let body = body.freeze();
407
408            // If the option for parsing the response as JSON was provided, parse it directly,
409            // and return that as JSON
410            let output = if extract_json {
411                let body_json = serde_json::from_slice(&body)
412                    .context("failed to parse response body bytes into a valid JSON object")?;
413                CommandOutput::new(
414                    serde_json::to_string_pretty(&body_json)
415                        .context("failed to print http response JSON")?,
416                    HashMap::from([("response".into(), body_json)]),
417                )
418            } else {
419                let http_resp = HttpResponse {
420                    status,
421                    headers,
422                    body: String::from_utf8(Vec::from(body))
423                        .context("failed to parse returned bytes as string")?,
424                };
425                CommandOutput::new(
426                    serde_json::to_string_pretty(&http_resp)
427                        .context("failed to print http response JSON")?,
428                    HashMap::from([(
429                        "response".into(),
430                        serde_json::to_value(&http_resp)
431                            .context("failed to convert http response to value")?,
432                    )]),
433                )
434            };
435
436            Ok(output)
437        }
438        // For all other responses, something has gone wrong
439        _ => bail!("unexpected response after HTTP wRPC invocation"),
440    }
441}
442
443/// Invoke a wRPC endpoint that takes nothing and returns a string
444async fn wrpc_invoke_simple(
445    client: wrpc_transport_nats::Client,
446    lattice: &str,
447    component_id: &str,
448    instance: &str,
449    function_name: &str,
450    timeout_ms: u64,
451) -> Result<CommandOutput> {
452    let result = client
453           .timeout(Duration::from_millis(timeout_ms))
454           .invoke_values_blocking::<_, ((),), (String,)>(
455               Some(gen_wash_call_headers()),
456               instance,
457               function_name,
458               ((),),
459               &[[]; 0],
460           )
461   .await
462   .with_context(|| format!("timed out invoking component, is component [{component_id}] running in lattice [{lattice}]?"));
463
464    match result {
465       Ok((result,)) => {
466               Ok(CommandOutput::new(result.clone(), HashMap::from([("result".to_string(), json!(result))])))
467       }
468       Err(e) if e.to_string().contains("transmission failed") => bail!("No component responded to your request, ensure component {component_id} is running in lattice {lattice}"),
469       Err(e) => bail!("Error invoking component: {e}"),
470   }
471}
472
473// Helper output functions, used to ensure consistent output between call & standalone commands
474pub fn call_output(
475    response: Vec<u8>,
476    save_output: Option<PathBuf>,
477    bin: char,
478    is_test: bool,
479) -> Result<CommandOutput> {
480    if let Some(ref save_path) = save_output {
481        std::fs::write(save_path, response)
482            .with_context(|| format!("Error saving results to {}", &save_path.display()))?;
483
484        return Ok(CommandOutput::new(
485            "",
486            HashMap::<String, serde_json::Value>::new(),
487        ));
488    }
489
490    if is_test {
491        // try to decode it as TestResults, otherwise dump as text
492        let test_results: Vec<TestResult> =
493            rmp_serde::from_slice(&response).with_context(|| {
494                format!(
495                    "Error interpreting response as TestResults. Response: {}",
496                    String::from_utf8_lossy(&response)
497                )
498            })?;
499
500        print_test_results(&test_results);
501        return Ok(CommandOutput::new(
502            "",
503            HashMap::<String, serde_json::Value>::new(),
504        ));
505    }
506
507    let json = HashMap::from([
508        (
509            "response".to_string(),
510            msgpack_to_json_val(response.clone(), bin),
511        ),
512        ("success".to_string(), serde_json::json!(true)),
513    ]);
514
515    Ok(CommandOutput::new(
516        format!(
517            "\nCall response (raw): {}",
518            String::from_utf8_lossy(&response)
519        ),
520        json,
521    ))
522}
523
524/// Create an async nats client which is meant to work with [`async_nats_wrpc`]
525///
526/// Normally we would use `create_nats_client_from_opts` here, but until the schism between [`async_nats_wrpc`]
527/// and [`async_nats`] is resolved, we must replicate that logic here, as upstream `async_nats` does not match.
528async fn create_client_from_opts_wrpc(opts: &ConnectionOpts) -> Result<async_nats::Client> {
529    let ConnectionOpts {
530        rpc_host: host,
531        rpc_port: port,
532        rpc_jwt: jwt,
533        rpc_seed: seed,
534        rpc_credsfile: credsfile,
535        rpc_ca_file: tls_ca_file,
536        ..
537    } = opts;
538
539    let nats_url = format!("{host}:{port}");
540    use async_nats::ConnectOptions;
541
542    let nc = if let Some(jwt_file) = jwt {
543        let jwt_contents = extract_arg_value(jwt_file)
544            .with_context(|| format!("Failed to extract jwt contents from {}", &jwt_file))?;
545        let kp = std::sync::Arc::new(if let Some(seed) = seed {
546            nkeys::KeyPair::from_seed(
547                &extract_arg_value(seed)
548                    .with_context(|| format!("Failed to extract seed value {}", &seed))?,
549            )
550            .with_context(|| format!("Failed to create keypair from seed value {}", &seed))?
551        } else {
552            nkeys::KeyPair::new_user()
553        });
554
555        // You must provide the JWT via a closure
556        let mut opts = async_nats::ConnectOptions::with_jwt(jwt_contents, move |nonce| {
557            let key_pair = kp.clone();
558            async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
559        });
560
561        if let Some(ref ca_file) = tls_ca_file {
562            opts = opts
563                .add_root_certificates(ca_file.clone())
564                .require_tls(true);
565        }
566
567        opts.connect(&nats_url).await.with_context(|| {
568            format!(
569                "Failed to connect to NATS server {}:{} while creating client",
570                &host, &port
571            )
572        })?
573    } else if let Some(credsfile_path) = credsfile {
574        let mut opts = ConnectOptions::with_credentials_file(credsfile_path.clone())
575            .await
576            .with_context(|| {
577                format!(
578                    "Failed to authenticate to NATS with credentials file {:?}",
579                    &credsfile_path
580                )
581            })?;
582
583        if let Some(ca_file) = tls_ca_file {
584            opts = opts
585                .add_root_certificates(ca_file.clone())
586                .require_tls(true);
587        }
588
589        opts.connect(&nats_url).await.with_context(|| {
590            format!(
591                "Failed to connect to NATS {} with credentials file {:?}",
592                &nats_url, &credsfile_path
593            )
594        })?
595    } else {
596        let mut opts = ConnectOptions::new();
597
598        if let Some(ca_file) = tls_ca_file {
599            opts = opts
600                .add_root_certificates(ca_file.clone())
601                .require_tls(true);
602        }
603
604        opts.connect(&nats_url)
605            .await
606            .with_context(|| format!("Failed to connect to NATS {}", &nats_url))?
607    };
608    Ok(nc)
609}
610
611fn gen_wash_call_headers() -> async_nats::HeaderMap {
612    let mut headers = async_nats::HeaderMap::new();
613    headers.insert("source-id", "wash");
614    headers
615}
616
617#[cfg(test)]
618mod test {
619    use super::CallCommand;
620    use anyhow::Result;
621    use clap::Parser;
622
623    const RPC_HOST: &str = "127.0.0.1";
624    const RPC_PORT: &str = "4222";
625    const DEFAULT_LATTICE: &str = "default";
626
627    const COMPONENT_ID: &str = "MDPDJEYIAK6MACO67PRFGOSSLODBISK4SCEYDY3HEOY4P5CVJN6UCWUK";
628
629    #[derive(Debug, Parser)]
630    struct Cmd {
631        #[clap(flatten)]
632        command: CallCommand,
633    }
634
635    #[test]
636    fn test_rpc_comprehensive() -> Result<()> {
637        let call_all: Cmd = Parser::try_parse_from([
638            "call",
639            "--context",
640            "some-context",
641            "--lattice",
642            DEFAULT_LATTICE,
643            "--rpc-host",
644            RPC_HOST,
645            "--rpc-port",
646            RPC_PORT,
647            "--rpc-timeout-ms",
648            "0",
649            COMPONENT_ID,
650            "wasmcloud:test/handle.operation",
651        ])?;
652        match call_all.command {
653            CallCommand {
654                opts,
655                component_id,
656                function,
657                ..
658            } => {
659                assert_eq!(&opts.rpc_host, RPC_HOST);
660                assert_eq!(&opts.rpc_port, RPC_PORT);
661                assert_eq!(&opts.lattice.unwrap(), DEFAULT_LATTICE);
662                assert_eq!(opts.timeout_ms, 0);
663                assert_eq!(opts.context, Some("some-context".to_string()));
664                assert_eq!(component_id, COMPONENT_ID);
665                assert_eq!(function, "wasmcloud:test/handle.operation");
666            }
667            #[allow(unreachable_patterns)]
668            cmd => panic!("call constructed incorrect command: {cmd:?}"),
669        }
670        Ok(())
671    }
672}