1use std::collections::{BTreeMap, HashMap};
2
3use anyhow::{bail, Context, Result};
4use tokio::time::Duration;
5use wasmcloud_control_interface::{Client as CtlClient, CtlResponse};
6
7use crate::common::boxed_err_to_anyhow;
8use crate::config::DEFAULT_START_COMPONENT_TIMEOUT_MS;
9use crate::wait::{wait_for_component_scaled_event, FindEventOutcome};
10
11pub struct ComponentScaledInfo {
13 pub host_id: String,
14 pub component_ref: String,
15 pub component_id: String,
16}
17
18pub struct ScaleComponentArgs<'a> {
28 pub client: &'a CtlClient,
30 pub host_id: &'a str,
32 pub component_id: &'a str,
34 pub component_ref: &'a str,
36 pub max_instances: u32,
38 pub annotations: Option<HashMap<String, String>>,
40 pub config: Vec<String>,
42 pub skip_wait: bool,
44 pub timeout_ms: Option<u64>,
46}
47
48pub async fn scale_component(
50 ScaleComponentArgs {
51 client,
52 host_id,
53 component_id,
54 component_ref,
55 max_instances,
56 annotations,
57 config,
58 skip_wait,
59 timeout_ms,
60 }: ScaleComponentArgs<'_>,
61) -> Result<ComponentScaledInfo> {
62 let timeout_ms = timeout_ms.unwrap_or(DEFAULT_START_COMPONENT_TIMEOUT_MS);
64
65 let mut receiver = client
67 .events_receiver(vec![
68 "component_scaled".to_string(),
69 "component_scale_failed".to_string(),
70 ])
71 .await
72 .map_err(boxed_err_to_anyhow)
73 .context("Failed to get lattice event channel")?;
74
75 let ack = client
76 .scale_component(
77 host_id,
78 component_ref,
79 component_id,
80 max_instances,
81 annotations.map(BTreeMap::from_iter),
82 config,
83 )
84 .await
85 .map_err(boxed_err_to_anyhow)?;
86
87 if !ack.succeeded() {
88 bail!("Operation failed: {}", ack.message());
89 }
90
91 if skip_wait {
93 return Ok(ComponentScaledInfo {
94 host_id: host_id.into(),
95 component_ref: component_ref.into(),
96 component_id: component_id.into(),
97 });
98 }
99
100 let event = wait_for_component_scaled_event(
102 &mut receiver,
103 Duration::from_millis(timeout_ms),
104 host_id,
105 component_ref,
106 )
107 .await
108 .with_context(|| {
109 format!(
110 "Timed out waiting for start event for component [{component_ref}] on host [{host_id}]"
111 )
112 })?;
113
114 match event {
115 FindEventOutcome::Success(info) => Ok(info),
116 FindEventOutcome::Failure(err) => Err(err).with_context(|| {
117 format!("Failed to scale component [{component_id}] on host [{host_id}]",)
118 }),
119 }
120}
121
122pub async fn update_component(
123 client: &CtlClient,
124 host_id: &str,
125 component_id: &str,
126 component_ref: &str,
127) -> Result<CtlResponse<()>> {
128 client
129 .update_component(host_id, component_id, component_ref, None)
130 .await
131 .map_err(boxed_err_to_anyhow)
132}