use anyhow::{bail, Result};
use chrono::Utc;
use std::collections::BTreeMap;
use std::path::Path;
use std::time::SystemTime;
use tokio::process::Command;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::config::model::{ClusterDeployConfig, ClusterImageConfig};
use crate::orchestrator::state::ClusterDeployState;
async fn run_cmd(
cmd: &str,
args: &[&str],
working_dir: Option<&Path>,
env: Option<(&str, &Path)>,
cancel: &CancellationToken,
) -> Result<()> {
let mut command = Command::new(cmd);
command.args(args);
if let Some(dir) = working_dir {
command.current_dir(dir);
}
if let Some((key, value)) = env {
command.env(key, value);
}
let child = command.output();
let output = tokio::select! {
result = child => result?,
_ = cancel.cancelled() => {
bail!("cancelled");
}
};
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!(
"{} {} failed: {}",
cmd,
args.first().unwrap_or(&""),
stderr.trim()
);
}
Ok(())
}
use crate::platform;
fn docker_build_args<'a>(
tag: &'a str,
dockerfile: &'a str,
secret_args: &'a [String],
build_args: &'a [String],
no_cache: bool,
) -> Vec<&'a str> {
let mut args = vec!["build", "-t", tag, "-f", dockerfile];
if no_cache {
args.push("--no-cache");
}
for secret_arg in secret_args {
args.push("--secret");
args.push(secret_arg);
}
for build_arg in build_args {
args.push("--build-arg");
args.push(build_arg);
}
args.push(".");
args
}
fn format_secret_args(build_secrets: &BTreeMap<String, String>) -> Vec<String> {
build_secrets
.iter()
.map(|(id, path)| format!("id={id},src={}", platform::expand_home(path)))
.collect()
}
fn format_build_args(
build_args: &BTreeMap<String, String>,
deployed: &BTreeMap<String, ClusterDeployState>,
) -> Vec<String> {
build_args
.iter()
.map(|(key, value)| {
let interpolated = interpolate_image_refs(value, deployed);
format!("{key}={interpolated}")
})
.collect()
}
fn interpolate_image_refs(value: &str, deployed: &BTreeMap<String, ClusterDeployState>) -> String {
let mut result = value.to_string();
for (name, state) in deployed {
let pattern = format!("{{{{ cluster.image.{name}.tag }}}}");
result = result.replace(&pattern, &state.image_tag);
}
result
}
pub async fn run_deploy(
name: &str,
deploy_config: &ClusterDeployConfig,
registry_port: Option<u16>,
kubeconfig_path: &Path,
config_dir: &Path,
cancel: &CancellationToken,
) -> Result<ClusterDeployState> {
let context_path = config_dir.join(&deploy_config.context);
let manifests_path = config_dir.join(&deploy_config.manifests);
let tag = if let Some(port) = registry_port {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
format!("localhost:{port}/{name}:{timestamp}")
} else {
format!("devrig-{name}:latest")
};
debug!(name, tag, "building image");
let secret_args = format_secret_args(&deploy_config.build_secrets);
let args = docker_build_args(&tag, &deploy_config.dockerfile, &secret_args, &[], false);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
if registry_port.is_some() {
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
}
let manifests_str = manifests_path.to_string_lossy();
debug!(name, manifests = %manifests_str, "applying manifests");
run_cmd(
"kubectl",
&["apply", "-f", &manifests_str],
None,
Some(("KUBECONFIG", kubeconfig_path)),
cancel,
)
.await?;
Ok(ClusterDeployState {
image_tag: tag,
last_deployed: Utc::now(),
})
}
pub async fn run_rebuild(
name: &str,
deploy_config: &ClusterDeployConfig,
registry_port: Option<u16>,
kubeconfig_path: &Path,
config_dir: &Path,
cancel: &CancellationToken,
) -> Result<()> {
let context_path = config_dir.join(&deploy_config.context);
let manifests_path = config_dir.join(&deploy_config.manifests);
let tag = if let Some(port) = registry_port {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
format!("localhost:{port}/{name}:{timestamp}")
} else {
format!("devrig-{name}:latest")
};
debug!(name, tag, "rebuilding image");
let secret_args = format_secret_args(&deploy_config.build_secrets);
let args = docker_build_args(&tag, &deploy_config.dockerfile, &secret_args, &[], false);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
if registry_port.is_some() {
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
}
let manifests_str = manifests_path.to_string_lossy();
debug!(name, manifests = %manifests_str, "applying manifests");
run_cmd(
"kubectl",
&["apply", "-f", &manifests_str],
None,
Some(("KUBECONFIG", kubeconfig_path)),
cancel,
)
.await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
let deployment = format!("deployment/{name}");
debug!(name, "restarting deployment");
run_cmd(
"kubectl",
&["rollout", "restart", &deployment],
None,
Some(("KUBECONFIG", kubeconfig_path)),
cancel,
)
.await?;
Ok(())
}
pub async fn run_image_build(
name: &str,
image_config: &ClusterImageConfig,
registry_port: Option<u16>,
config_dir: &Path,
deployed: &BTreeMap<String, ClusterDeployState>,
cancel: &CancellationToken,
) -> Result<ClusterDeployState> {
let context_path = config_dir.join(&image_config.context);
let tag = if let Some(port) = registry_port {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
format!("localhost:{port}/{name}:{timestamp}")
} else {
format!("devrig-{name}:latest")
};
debug!(name, tag, "building image");
let secret_args = format_secret_args(&image_config.build_secrets);
let build_args = format_build_args(&image_config.build_args, deployed);
let args = docker_build_args(&tag, &image_config.dockerfile, &secret_args, &build_args, false);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
if let Some(port) = registry_port {
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
let latest_tag = format!("localhost:{port}/{name}:latest");
run_cmd("docker", &["tag", &tag, &latest_tag], None, None, cancel).await?;
run_cmd("docker", &["push", &latest_tag], None, None, cancel).await?;
}
Ok(ClusterDeployState {
image_tag: tag,
last_deployed: Utc::now(),
})
}
pub async fn rebuild_image(
name: &str,
image_config: &ClusterImageConfig,
registry_port: Option<u16>,
config_dir: &Path,
deployed: &BTreeMap<String, ClusterDeployState>,
cancel: &CancellationToken,
) -> Result<()> {
let context_path = config_dir.join(&image_config.context);
let tag = if let Some(port) = registry_port {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
format!("localhost:{port}/{name}:{timestamp}")
} else {
format!("devrig-{name}:latest")
};
debug!(name, tag, "rebuilding image");
let secret_args = format_secret_args(&image_config.build_secrets);
let build_args = format_build_args(&image_config.build_args, deployed);
let args = docker_build_args(&tag, &image_config.dockerfile, &secret_args, &build_args, false);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
if let Some(port) = registry_port {
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
let latest_tag = format!("localhost:{port}/{name}:latest");
run_cmd("docker", &["tag", &tag, &latest_tag], None, None, cancel).await?;
run_cmd("docker", &["push", &latest_tag], None, None, cancel).await?;
}
Ok(())
}
pub async fn fresh_rebuild_image(
name: &str,
image_config: &ClusterImageConfig,
registry_port: u16,
config_dir: &Path,
deployed: &BTreeMap<String, ClusterDeployState>,
cancel: &CancellationToken,
) -> Result<ClusterDeployState> {
let context_path = config_dir.join(&image_config.context);
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
let tag = format!("localhost:{registry_port}/{name}:{timestamp}");
println!(" Building image '{name}' (--no-cache)...");
debug!(name, tag, "fresh building image with --no-cache");
let secret_args = format_secret_args(&image_config.build_secrets);
let build_args = format_build_args(&image_config.build_args, deployed);
let args = docker_build_args(&tag, &image_config.dockerfile, &secret_args, &build_args, true);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
let latest_tag = format!("localhost:{registry_port}/{name}:latest");
run_cmd("docker", &["tag", &tag, &latest_tag], None, None, cancel).await?;
run_cmd("docker", &["push", &latest_tag], None, None, cancel).await?;
println!(" Pushed '{name}' -> {tag}");
Ok(ClusterDeployState {
image_tag: tag,
last_deployed: Utc::now(),
})
}
pub async fn fresh_rebuild_deploy(
name: &str,
deploy_config: &ClusterDeployConfig,
registry_port: u16,
kubeconfig_path: &Path,
config_dir: &Path,
apply_manifests: bool,
cancel: &CancellationToken,
) -> Result<ClusterDeployState> {
let context_path = config_dir.join(&deploy_config.context);
let manifests_path = config_dir.join(&deploy_config.manifests);
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
let tag = format!("localhost:{registry_port}/{name}:{timestamp}");
println!(" Building deploy '{name}' (--no-cache)...");
debug!(name, tag, "fresh building deploy image with --no-cache");
let secret_args = format_secret_args(&deploy_config.build_secrets);
let args = docker_build_args(&tag, &deploy_config.dockerfile, &secret_args, &[], true);
run_cmd("docker", &args, Some(&context_path), None, cancel).await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
debug!(name, tag, "pushing image");
run_cmd("docker", &["push", &tag], None, None, cancel).await?;
let latest_tag = format!("localhost:{registry_port}/{name}:latest");
run_cmd("docker", &["tag", &tag, &latest_tag], None, None, cancel).await?;
run_cmd("docker", &["push", &latest_tag], None, None, cancel).await?;
println!(" Pushed '{name}' -> {tag}");
if apply_manifests {
let manifests_str = manifests_path.to_string_lossy();
debug!(name, manifests = %manifests_str, "applying manifests");
run_cmd(
"kubectl",
&["apply", "-f", &manifests_str],
None,
Some(("KUBECONFIG", kubeconfig_path)),
cancel,
)
.await?;
if cancel.is_cancelled() {
bail!("cancelled");
}
let deployment = format!("deployment/{name}");
debug!(name, "restarting deployment");
run_cmd(
"kubectl",
&["rollout", "restart", &deployment],
None,
Some(("KUBECONFIG", kubeconfig_path)),
cancel,
)
.await?;
println!(" Applied manifests and restarted deployment '{name}'");
}
Ok(ClusterDeployState {
image_tag: tag,
last_deployed: Utc::now(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
#[test]
fn interpolate_image_refs_replaces_tags() {
let mut deployed = BTreeMap::new();
deployed.insert(
"bloom".to_string(),
ClusterDeployState {
image_tag: "localhost:12345/bloom:1700000000".to_string(),
last_deployed: Utc::now(),
},
);
let result =
interpolate_image_refs("{{ cluster.image.bloom.tag }}", &deployed);
assert_eq!(result, "localhost:12345/bloom:1700000000");
}
#[test]
fn interpolate_image_refs_no_match_unchanged() {
let deployed = BTreeMap::new();
let result = interpolate_image_refs("some-static-value", &deployed);
assert_eq!(result, "some-static-value");
}
#[test]
fn format_build_args_interpolates_and_formats() {
let mut build_args = BTreeMap::new();
build_args.insert(
"SERVER_IMAGE".to_string(),
"{{ cluster.image.bloom.tag }}".to_string(),
);
build_args.insert("STATIC_ARG".to_string(), "hello".to_string());
let mut deployed = BTreeMap::new();
deployed.insert(
"bloom".to_string(),
ClusterDeployState {
image_tag: "localhost:5000/bloom:123".to_string(),
last_deployed: Utc::now(),
},
);
let result = format_build_args(&build_args, &deployed);
assert!(result.contains(&"SERVER_IMAGE=localhost:5000/bloom:123".to_string()));
assert!(result.contains(&"STATIC_ARG=hello".to_string()));
}
#[test]
fn docker_build_args_includes_build_args() {
let build_args = vec!["SERVER_IMAGE=foo:latest".to_string()];
let args = docker_build_args("tag:1", "Dockerfile", &[], &build_args, false);
assert!(args.contains(&"--build-arg"));
assert!(args.contains(&"SERVER_IMAGE=foo:latest"));
assert!(!args.contains(&"--no-cache"));
}
#[test]
fn docker_build_args_includes_no_cache() {
let args = docker_build_args("tag:1", "Dockerfile", &[], &[], true);
assert!(args.contains(&"--no-cache"));
}
}