1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::collections::HashMap;

use anyhow::{bail, Context, Result};
use tokio::time::Duration;
use wasmcloud_control_interface::{Client as CtlClient, CtlResponse};

use crate::{
    common::boxed_err_to_anyhow,
    config::DEFAULT_START_COMPONENT_TIMEOUT_MS,
    wait::{wait_for_component_scaled_event, FindEventOutcome},
};

/// Information related to a component scale
pub struct ComponentScaledInfo {
    pub host_id: String,
    pub component_ref: String,
    pub component_id: String,
}

/// Arguments required when scaling an component
///
/// # Properties
/// * `client` - The control interface client
/// * `host_id` - The ID of the host where the component is running
/// * `component_id` - The ID of the component to scale
/// * `component_ref` - The reference of the component to scale
/// * `max_instances` - The maximum number of instances to scale to
/// * `annotations` - Optional annotations to apply to the component
pub struct ScaleComponentArgs<'a> {
    /// The control interface client
    pub client: &'a CtlClient,
    /// The ID of the host where the component is running
    pub host_id: &'a str,
    /// The ID of the component to scale
    pub component_id: &'a str,
    /// The reference of the component to scale
    pub component_ref: &'a str,
    /// The maximum number of instances to scale to
    pub max_instances: u32,
    /// Optional annotations to apply to the component
    pub annotations: Option<HashMap<String, String>>,
    /// List of named configuration to apply to the component, may be empty
    pub config: Vec<String>,
    /// Whether to wait for the component to scale
    pub skip_wait: bool,
    /// The timeout for waiting for the component to scale
    pub timeout_ms: Option<u64>,
}

/// Scale a Wasmcloud component on a given host
pub async fn scale_component(
    ScaleComponentArgs {
        client,
        host_id,
        component_id,
        component_ref,
        max_instances,
        annotations,
        config,
        skip_wait,
        timeout_ms,
    }: ScaleComponentArgs<'_>,
) -> Result<ComponentScaledInfo> {
    // If timeout isn't supplied, override with a longer timeout for starting component
    let timeout_ms = timeout_ms.unwrap_or(DEFAULT_START_COMPONENT_TIMEOUT_MS);

    // Create a receiver to use with the client
    let mut receiver = client
        .events_receiver(vec![
            "component_scaled".to_string(),
            "component_scale_failed".to_string(),
        ])
        .await
        .map_err(boxed_err_to_anyhow)
        .context("Failed to get lattice event channel")?;

    let ack = client
        .scale_component(
            host_id,
            component_ref,
            component_id,
            max_instances,
            annotations,
            config,
        )
        .await
        .map_err(boxed_err_to_anyhow)?;

    if !ack.success {
        bail!("Operation failed: {}", ack.message);
    }

    // If skip_wait is specified, return incomplete information immediately
    if skip_wait {
        return Ok(ComponentScaledInfo {
            host_id: host_id.into(),
            component_ref: component_ref.into(),
            component_id: component_id.into(),
        });
    }

    // Wait for the component to start
    let event = wait_for_component_scaled_event(
        &mut receiver,
        Duration::from_millis(timeout_ms),
        host_id,
        component_ref,
    )
    .await
    .with_context(|| {
        format!(
            "Timed out waiting for start event for component [{component_ref}] on host [{host_id}]"
        )
    })?;

    match event {
        FindEventOutcome::Success(info) => Ok(info),
        FindEventOutcome::Failure(err) => Err(err).with_context(|| {
            format!("Failed to scale component [{component_id}] on host [{host_id}]",)
        }),
    }
}

pub async fn update_component(
    client: &CtlClient,
    host_id: &str,
    component_id: &str,
    component_ref: &str,
) -> Result<CtlResponse<()>> {
    client
        .update_component(host_id, component_id, component_ref, None)
        .await
        .map_err(boxed_err_to_anyhow)
}