use chrono::Utc;
use clap::Args;
use kube::api::DynamicObject;
use std::collections::HashMap;
use colored::Colorize;
use crate::{
cli::{
commands::render::{run_render_preflight, ClusterClientRequirement, RenderOptions, RenderPreflightOptions},
namespace_resolution::{adjust_duplicate_keys_for_namespace_resolution, resolve_manifest_namespaces},
},
kubernetes::{
ApplyOutcome, KubeClient, KubeRsClient, KubernetesReleaseStorage, ReleaseState, ReleaseStatus, ReleaseStorage,
ResourceKey, ResourceOrdering,
},
NylError, Result,
};
#[derive(Args, Debug)]
pub struct ApplyArgs {
#[command(flatten)]
pub common: RenderOptions,
#[arg(long)]
pub name: Option<String>,
#[arg(long)]
pub namespace: Option<String>,
#[arg(long)]
pub context: Option<String>,
#[arg(long)]
pub append_release: bool,
#[arg(long, conflicts_with_all = ["append_release", "name", "namespace"])]
pub no_release: bool,
}
#[allow(clippy::too_many_lines)]
pub async fn execute(args: ApplyArgs) -> Result<()> {
let preflight = run_render_preflight(RenderPreflightOptions {
common: &args.common,
offline: false,
kube_version: None,
kube_api_versions: &[],
context_override: args.context.as_deref(),
cluster_client_requirement: ClusterClientRequirement::Required,
resolve_namespaces: false,
release_namespace_hint: None,
adjust_duplicate_keys: false,
})
.await?;
let mut desired_manifests = preflight.manifests;
let nyl_release = preflight.nyl_release;
let mut duplicates = preflight.duplicates;
let kube_client = preflight
.kube_client
.ok_or_else(|| NylError::Config("Kubernetes client unavailable in online mode".to_string()))?;
let client = preflight
.raw_client
.ok_or_else(|| NylError::Config("Raw Kubernetes client unavailable in online mode".to_string()))?;
if desired_manifests.is_empty() {
tracing::info!("No manifests to apply");
return Ok(());
}
let release_namespace_hint = nyl_release
.as_ref()
.map(|release| release.metadata.namespace.as_str())
.or(args.namespace.as_deref());
resolve_manifest_namespaces(&kube_client, &mut desired_manifests, release_namespace_hint).await?;
duplicates =
adjust_duplicate_keys_for_namespace_resolution(&kube_client, &duplicates, release_namespace_hint).await?;
if !duplicates.is_empty() {
print_duplicate_warning(&duplicates);
}
let mut sorted_manifests = desired_manifests.clone();
ResourceOrdering::sort_by_priority(&mut sorted_manifests)?;
let apply_result = apply_sorted_manifests(&kube_client, &sorted_manifests).await?;
if args.no_release {
print_apply_summary(&apply_result.outcomes, None, &duplicates, apply_result.failed_count);
if apply_result.failed_count > 0 {
return Err(NylError::Other(format!(
"Apply completed with {} error(s)",
apply_result.failed_count
)));
}
return Ok(());
}
let (release_name, release_namespace) = if let Some(ref release) = nyl_release {
(release.metadata.name.clone(), release.metadata.namespace.clone())
} else {
let name = args.name.ok_or_else(|| {
NylError::Config("No NylRelease resource found. Specify --name and --namespace".to_string())
})?;
let namespace = args.namespace.ok_or_else(|| {
NylError::Config("No NylRelease resource found. Specify --name and --namespace".to_string())
})?;
(name, namespace)
};
let storage = KubernetesReleaseStorage::new(client);
let revisions = storage.list_revisions(&release_name, &release_namespace).await?;
let next_revision = revisions.iter().max().map_or(1, |r| r + 1);
let mut release = ReleaseState {
release_name: release_name.clone(),
release_namespace: release_namespace.clone(),
revision: next_revision,
resource_keys: apply_result.resource_keys.clone(),
manifest: manifests_to_yaml(&desired_manifests)?,
status: ReleaseStatus::Rendered,
rendered_at: Utc::now(),
applied_at: None,
error: None,
};
if args.append_release && next_revision > 1 {
if let Ok(Some(previous_release)) = storage
.get_release(&release_name, &release_namespace, next_revision - 1)
.await
{
if previous_release.status != ReleaseStatus::Deployed {
return Err(NylError::Config(format!(
"Cannot use --append-release when previous release (revision {}) is in {:?} state. \
The previous release must be in Deployed state to safely merge resources.",
previous_release.revision, previous_release.status
)));
}
let current_keys: std::collections::HashSet<_> = release.resource_keys.iter().cloned().collect();
let mut merged_keys = Vec::new();
let mut added_from_previous = 0;
for prev_key in &previous_release.resource_keys {
if !current_keys.contains(prev_key) {
merged_keys.push(prev_key.clone());
added_from_previous += 1;
}
}
merged_keys.extend(release.resource_keys.clone());
let overlap = previous_release.resource_keys.len() - added_from_previous;
if overlap > 0 {
tracing::info!(
"Append-release mode: merged {} from previous + {} current ({} overlap, {} total)",
added_from_previous,
release.resource_keys.len(),
overlap,
merged_keys.len()
);
} else {
tracing::info!(
"Append-release mode: merged {} from previous + {} current ({} total)",
added_from_previous,
release.resource_keys.len(),
merged_keys.len()
);
}
release.resource_keys = merged_keys;
} else {
tracing::warn!(
"Append-release mode: no previous release found (revision {}), treating as initial apply",
next_revision - 1
);
}
}
if apply_result.failed_count == 0 {
release.status = ReleaseStatus::Deployed;
release.applied_at = Some(Utc::now());
} else {
release.status = ReleaseStatus::Failed;
release.error = Some(format!("{} resource(s) failed to apply", apply_result.failed_count));
}
ensure_namespace_exists(&kube_client, &release_namespace).await?;
storage.save_release(&release).await?;
if release.status == ReleaseStatus::Deployed && next_revision > 1 {
let prev_revision = next_revision - 1;
storage
.update_release_status(
&release_name,
&release_namespace,
prev_revision,
ReleaseStatus::Superseded,
None,
)
.await
.ok(); }
if !args.append_release && release.status == ReleaseStatus::Deployed && next_revision > 1 {
if let Ok(Some(previous_release)) = storage
.get_release(&release_name, &release_namespace, next_revision - 1)
.await
{
let current_keys: std::collections::HashSet<_> = release.resource_keys.iter().collect();
let to_prune: Vec<_> = previous_release
.resource_keys
.iter()
.filter(|k| !current_keys.contains(k))
.collect();
if !to_prune.is_empty() {
println!("\nPruning {} resources...", to_prune.len());
for key in to_prune {
match kube_client
.delete_resource(&key.gvk, key.namespace.as_deref(), &key.name)
.await
{
Ok(()) => {
println!(" ✓ Deleted {}", key);
}
Err(e) => {
println!(" ✗ Failed to delete {}: {}", key, e);
}
}
}
println!();
}
}
}
print_apply_summary(
&apply_result.outcomes,
Some(&release),
&duplicates,
apply_result.failed_count,
);
if apply_result.failed_count > 0 {
return Err(NylError::Other(format!(
"Apply completed with {} error(s)",
apply_result.failed_count
)));
}
Ok(())
}
struct ApplyExecutionResult {
outcomes: Vec<ApplyOutcome>,
failed_count: usize,
resource_keys: Vec<ResourceKey>,
}
async fn apply_sorted_manifests(
client: &KubeRsClient,
manifests: &[serde_json::Value],
) -> Result<ApplyExecutionResult> {
let mut outcomes = Vec::new();
let mut failed_count = 0;
let mut resource_keys = Vec::new();
for manifest in manifests {
let key = ResourceKey::from_json_value(manifest)?;
match apply_manifest(client, manifest).await {
Ok(outcome) => {
outcomes.push(outcome);
resource_keys.push(key);
}
Err(e) => {
let error_msg = format!("(failed to apply resource: {})", e);
println!("{} {} {}", "✗".red().bold(), key, error_msg.red());
failed_count += 1;
}
}
}
Ok(ApplyExecutionResult {
outcomes,
failed_count,
resource_keys,
})
}
fn manifests_to_yaml(manifests: &[serde_json::Value]) -> Result<String> {
let mut yaml_parts = Vec::new();
for manifest in manifests {
let yaml = crate::yaml::serialize_yaml_document(manifest).map_err(NylError::YamlEmit)?;
yaml_parts.push(yaml);
}
Ok(yaml_parts.join("---\n"))
}
async fn apply_manifest(client: &KubeRsClient, manifest: &serde_json::Value) -> Result<ApplyOutcome> {
let resource: DynamicObject = serde_json::from_value(manifest.clone())?;
client.apply_resource(&resource, "nyl", false).await
}
#[allow(clippy::too_many_lines)]
fn print_apply_summary(
outcomes: &[ApplyOutcome],
release: Option<&ReleaseState>,
duplicates: &HashMap<ResourceKey, usize>,
failed_count: usize,
) {
for outcome in outcomes {
match outcome {
ApplyOutcome::Created { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"+".green().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::Updated { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"~".yellow().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::Unchanged { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"=".bright_black().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::DryRun { would_be } => {
print_single_outcome(would_be, duplicates);
}
}
}
println!();
let mut created = 0;
let mut updated = 0;
let mut unchanged = 0;
for outcome in outcomes {
match outcome {
ApplyOutcome::Created { .. } => created += 1,
ApplyOutcome::Updated { .. } => updated += 1,
ApplyOutcome::Unchanged { .. } => unchanged += 1,
ApplyOutcome::DryRun { would_be } => match **would_be {
ApplyOutcome::Created { .. } => created += 1,
ApplyOutcome::Updated { .. } => updated += 1,
ApplyOutcome::Unchanged { .. } => unchanged += 1,
ApplyOutcome::DryRun { .. } => {} },
}
}
let total_duplicates_ignored: usize = duplicates.values().map(|count| count - 1).sum();
let mut parts = vec![
format!("{} created", created.to_string().green()),
format!("{} updated", updated.to_string().yellow()),
format!("{} unchanged", unchanged),
];
if total_duplicates_ignored > 0 {
let plural = if total_duplicates_ignored == 1 {
"duplicate"
} else {
"duplicates"
};
parts.push(format!(
"{} {} ignored",
total_duplicates_ignored.to_string().bright_black(),
plural
));
}
if failed_count > 0 {
parts.push(format!("{} failed", failed_count.to_string().red()));
}
println!("Summary: {}", parts.join(", "));
if let Some(release) = release {
println!();
if release.status == ReleaseStatus::Deployed {
println!(
"Release: {} revision {} deployed successfully to namespace {}",
release.release_name, release.revision, release.release_namespace
);
} else {
println!("Release: {} revision {} failed", release.release_name, release.revision);
}
}
}
fn print_single_outcome(outcome: &ApplyOutcome, duplicates: &HashMap<ResourceKey, usize>) {
match outcome {
ApplyOutcome::Created { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"+".green().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::Updated { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"~".yellow().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::Unchanged { resource_key } => {
let ns_name = format_namespace_name(outcome.namespace(), outcome.name());
let dup_annotation = get_duplicate_annotation(resource_key, duplicates);
println!(
"{} {} {}{}",
"=".bright_black().bold(),
outcome.kind(),
ns_name,
dup_annotation
);
}
ApplyOutcome::DryRun { would_be } => {
print_single_outcome(would_be, duplicates);
}
}
}
fn format_namespace_name(namespace: Option<&str>, name: &str) -> String {
if let Some(ns) = namespace {
format!("{}/{}", ns, name)
} else {
name.to_string()
}
}
fn print_duplicate_warning(duplicates: &HashMap<ResourceKey, usize>) {
if duplicates.is_empty() {
return;
}
let total_unique = duplicates.len();
let total_ignored: usize = duplicates.values().map(|count| count - 1).sum();
tracing::warn!(
"Found {} unique resources with duplicates ({} total duplicates ignored, keeping last occurrence)",
total_unique,
total_ignored
);
}
fn get_duplicate_annotation(resource_key: &ResourceKey, duplicates: &HashMap<ResourceKey, usize>) -> String {
if let Some(count) = duplicates.get(resource_key) {
let ignored_count = count - 1;
let plural = if ignored_count == 1 { "duplicate" } else { "duplicates" };
return format!(" {}", format!("({} {} ignored)", ignored_count, plural).yellow());
}
String::new()
}
async fn ensure_namespace_exists(client: &KubeRsClient, namespace: &str) -> Result<()> {
use crate::kubernetes::GroupVersionKind;
use kube::api::DynamicObject;
use serde_json::json;
let ns_gvk = GroupVersionKind::from_api_version_kind("v1", "Namespace")?;
if let Some(_ns) = client.get_resource(&ns_gvk, None, namespace).await? {
Ok(())
} else {
tracing::warn!(
"Namespace '{}' does not exist. Creating it to store release state.",
namespace
);
let ns_resource: DynamicObject = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace
}
}))?;
client.apply_resource(&ns_resource, "nyl", false).await?;
tracing::info!("Created namespace '{}'", namespace);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_manifests_to_yaml() {
let manifests = vec![
json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "test1"}
}),
json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "test2"}
}),
];
let yaml = manifests_to_yaml(&manifests).unwrap();
assert!(yaml.contains("test1"));
assert!(yaml.contains("test2"));
assert!(yaml.contains("---"));
}
#[test]
fn test_format_namespace_name_with_namespace() {
assert_eq!(format_namespace_name(Some("default"), "myapp"), "default/myapp");
}
#[test]
fn test_format_namespace_name_without_namespace() {
assert_eq!(format_namespace_name(None, "mynamespace"), "mynamespace");
}
}