use std::collections::{BTreeMap, HashMap};
use anyhow::{bail, Context, Result};
use tokio::time::Duration;
use wasmcloud_control_interface::{Client as CtlClient, CtlResponse};
use crate::common::boxed_err_to_anyhow;
use crate::config::DEFAULT_START_COMPONENT_TIMEOUT_MS;
use crate::wait::{wait_for_component_scaled_event, FindEventOutcome};
pub struct ComponentScaledInfo {
pub host_id: String,
pub component_ref: String,
pub component_id: String,
}
pub struct ScaleComponentArgs<'a> {
pub client: &'a CtlClient,
pub host_id: &'a str,
pub component_id: &'a str,
pub component_ref: &'a str,
pub max_instances: u32,
pub annotations: Option<HashMap<String, String>>,
pub config: Vec<String>,
pub skip_wait: bool,
pub timeout_ms: Option<u64>,
}
pub async fn scale_component(
ScaleComponentArgs {
client,
host_id,
component_id,
component_ref,
max_instances,
annotations,
config,
skip_wait,
timeout_ms,
}: ScaleComponentArgs<'_>,
) -> Result<ComponentScaledInfo> {
let timeout_ms = timeout_ms.unwrap_or(DEFAULT_START_COMPONENT_TIMEOUT_MS);
let mut receiver = client
.events_receiver(vec![
"component_scaled".to_string(),
"component_scale_failed".to_string(),
])
.await
.map_err(boxed_err_to_anyhow)
.context("Failed to get lattice event channel")?;
let ack = client
.scale_component(
host_id,
component_ref,
component_id,
max_instances,
annotations.map(BTreeMap::from_iter),
config,
)
.await
.map_err(boxed_err_to_anyhow)?;
if !ack.succeeded() {
bail!("Operation failed: {}", ack.message());
}
if skip_wait {
return Ok(ComponentScaledInfo {
host_id: host_id.into(),
component_ref: component_ref.into(),
component_id: component_id.into(),
});
}
let event = wait_for_component_scaled_event(
&mut receiver,
Duration::from_millis(timeout_ms),
host_id,
component_ref,
)
.await
.with_context(|| {
format!(
"Timed out waiting for start event for component [{component_ref}] on host [{host_id}]"
)
})?;
match event {
FindEventOutcome::Success(info) => Ok(info),
FindEventOutcome::Failure(err) => Err(err).with_context(|| {
format!("Failed to scale component [{component_id}] on host [{host_id}]",)
}),
}
}
pub async fn update_component(
client: &CtlClient,
host_id: &str,
component_id: &str,
component_ref: &str,
) -> Result<CtlResponse<()>> {
client
.update_component(host_id, component_id, component_ref, None)
.await
.map_err(boxed_err_to_anyhow)
}