wash_lib/
component.rs

1use std::collections::{BTreeMap, HashMap};
2
3use anyhow::{bail, Context, Result};
4use tokio::time::Duration;
5use wasmcloud_control_interface::{Client as CtlClient, CtlResponse};
6
7use crate::common::boxed_err_to_anyhow;
8use crate::config::DEFAULT_START_COMPONENT_TIMEOUT_MS;
9use crate::wait::{wait_for_component_scaled_event, FindEventOutcome};
10
11/// Information related to a component scale
12pub struct ComponentScaledInfo {
13    pub host_id: String,
14    pub component_ref: String,
15    pub component_id: String,
16}
17
18/// Arguments required when scaling an component
19///
20/// # Properties
21/// * `client` - The control interface client
22/// * `host_id` - The ID of the host where the component is running
23/// * `component_id` - The ID of the component to scale
24/// * `component_ref` - The reference of the component to scale
25/// * `max_instances` - The maximum number of instances to scale to
26/// * `annotations` - Optional annotations to apply to the component
27pub struct ScaleComponentArgs<'a> {
28    /// The control interface client
29    pub client: &'a CtlClient,
30    /// The ID of the host where the component is running
31    pub host_id: &'a str,
32    /// The ID of the component to scale
33    pub component_id: &'a str,
34    /// The reference of the component to scale
35    pub component_ref: &'a str,
36    /// The maximum number of instances to scale to
37    pub max_instances: u32,
38    /// Optional annotations to apply to the component
39    pub annotations: Option<HashMap<String, String>>,
40    /// List of named configuration to apply to the component, may be empty
41    pub config: Vec<String>,
42    /// Whether to wait for the component to scale
43    pub skip_wait: bool,
44    /// The timeout for waiting for the component to scale
45    pub timeout_ms: Option<u64>,
46}
47
48/// Scale a Wasmcloud component on a given host
49pub async fn scale_component(
50    ScaleComponentArgs {
51        client,
52        host_id,
53        component_id,
54        component_ref,
55        max_instances,
56        annotations,
57        config,
58        skip_wait,
59        timeout_ms,
60    }: ScaleComponentArgs<'_>,
61) -> Result<ComponentScaledInfo> {
62    // If timeout isn't supplied, override with a longer timeout for starting component
63    let timeout_ms = timeout_ms.unwrap_or(DEFAULT_START_COMPONENT_TIMEOUT_MS);
64
65    // Create a receiver to use with the client
66    let mut receiver = client
67        .events_receiver(vec![
68            "component_scaled".to_string(),
69            "component_scale_failed".to_string(),
70        ])
71        .await
72        .map_err(boxed_err_to_anyhow)
73        .context("Failed to get lattice event channel")?;
74
75    let ack = client
76        .scale_component(
77            host_id,
78            component_ref,
79            component_id,
80            max_instances,
81            annotations.map(BTreeMap::from_iter),
82            config,
83        )
84        .await
85        .map_err(boxed_err_to_anyhow)?;
86
87    if !ack.succeeded() {
88        bail!("Operation failed: {}", ack.message());
89    }
90
91    // If skip_wait is specified, return incomplete information immediately
92    if skip_wait {
93        return Ok(ComponentScaledInfo {
94            host_id: host_id.into(),
95            component_ref: component_ref.into(),
96            component_id: component_id.into(),
97        });
98    }
99
100    // Wait for the component to start
101    let event = wait_for_component_scaled_event(
102        &mut receiver,
103        Duration::from_millis(timeout_ms),
104        host_id,
105        component_ref,
106    )
107    .await
108    .with_context(|| {
109        format!(
110            "Timed out waiting for start event for component [{component_ref}] on host [{host_id}]"
111        )
112    })?;
113
114    match event {
115        FindEventOutcome::Success(info) => Ok(info),
116        FindEventOutcome::Failure(err) => Err(err).with_context(|| {
117            format!("Failed to scale component [{component_id}] on host [{host_id}]",)
118        }),
119    }
120}
121
122pub async fn update_component(
123    client: &CtlClient,
124    host_id: &str,
125    component_id: &str,
126    component_ref: &str,
127) -> Result<CtlResponse<()>> {
128    client
129        .update_component(host_id, component_id, component_ref, None)
130        .await
131        .map_err(boxed_err_to_anyhow)
132}