use crate::error::{K8sError, Result};
use crate::utils::{with_retry, RetryConfig, RetryableError};
use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::core::v1::{Namespace, Pod};
use kube::api::{Api, ListParams, Patch, PatchParams};
use kube::config::{KubeConfigOptions, Kubeconfig};
use kube::{Client, Config};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug)]
struct K8sRetryableError(K8sError);
impl std::fmt::Display for K8sRetryableError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(core::format_args!("{}", self.0))
}
}
impl RetryableError for K8sRetryableError {
fn is_retryable(&self) -> bool {
match &self.0 {
K8sError::KubeApi(msg) => {
msg.contains("connection")
|| msg.contains("timeout")
|| msg.contains("503")
|| msg.contains("504")
|| msg.contains("429")
|| msg.contains("ETIMEDOUT")
|| msg.contains("temporarily unavailable")
}
K8sError::ClusterUnreachable(msg) => {
msg.contains("connection") || msg.contains("timeout") || msg.contains("refused")
}
K8sError::RolloutTimeout(_) => false,
K8sError::KubeconfigInvalid => false,
K8sError::ContextNotFound(_) => false,
K8sError::DeploymentNotFound(_, _) => false,
K8sError::NamespaceNotFound(_) => false,
K8sError::RolloutFailed(_) => false,
K8sError::ManifestError(_) => false,
K8sError::PermissionDenied(_) => false,
}
}
}
impl From<K8sRetryableError> for crate::error::ApiForgError {
fn from(e: K8sRetryableError) -> Self {
crate::error::ApiForgError::Kubernetes(e.0)
}
}
pub struct K8sClient {
client: Arc<Client>,
context: String,
retry_config: RetryConfig,
}
#[derive(Debug, Clone)]
pub struct RolloutStatus {
pub ready: bool,
pub ready_replicas: i32,
pub desired_replicas: i32,
pub updated_replicas: i32,
pub available_replicas: i32,
pub message: String,
}
impl K8sClient {
pub async fn new(context: &str) -> Result<Self> {
let kubeconfig = Kubeconfig::read().map_err(|_e| K8sError::KubeconfigInvalid)?;
let config = Config::from_custom_kubeconfig(
kubeconfig,
&KubeConfigOptions {
context: Some(context.to_string()),
..Default::default()
},
)
.await
.map_err(|_e| K8sError::ContextNotFound(context.to_string()))?;
let client = Arc::new(
Client::try_from(config).map_err(|e| K8sError::ClusterUnreachable(e.to_string()))?,
);
Ok(Self {
client,
context: context.to_string(),
retry_config: RetryConfig::default(),
})
}
pub async fn new_in_cluster() -> Result<Self> {
let client = Arc::new(
Client::try_default()
.await
.map_err(|e| K8sError::ClusterUnreachable(e.to_string()))?,
);
Ok(Self {
client,
context: "in-cluster".to_string(),
retry_config: RetryConfig::default(),
})
}
pub async fn verify_connection(&self) -> Result<()> {
let _: Api<Namespace> = Api::all(self.client.as_ref().clone());
Ok(())
}
pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
let client = self.client.clone();
let namespace = namespace.to_string();
let retry_config = self.retry_config.clone();
let result = with_retry(&retry_config, "K8s namespace_exists", || {
let client = client.clone();
let namespace = namespace.clone();
async move {
let namespaces: Api<Namespace> = Api::all(client.as_ref().clone());
match namespaces.get(&namespace).await {
Ok(_) => Ok(true),
Err(kube::Error::Api(err)) if err.code == 404 => Ok(false),
Err(e) => Err(K8sRetryableError(K8sError::KubeApi(e.to_string()))),
}
}
})
.await?;
Ok(result)
}
pub async fn get_deployment(&self, namespace: &str, name: &str) -> Result<Deployment> {
let client = self.client.clone();
let namespace = namespace.to_string();
let name = name.to_string();
let retry_config = self.retry_config.clone();
let result = with_retry(&retry_config, "K8s get_deployment", || {
let client = client.clone();
let namespace = namespace.clone();
let name = name.clone();
async move {
let deployments: Api<Deployment> =
Api::namespaced(client.as_ref().clone(), &namespace);
deployments.get(&name).await.map_err(|e| match e {
kube::Error::Api(err) if err.code == 404 => K8sRetryableError(
K8sError::DeploymentNotFound(name.clone(), namespace.clone()),
),
_ => K8sRetryableError(K8sError::KubeApi(e.to_string())),
})
}
})
.await?;
Ok(result)
}
pub async fn update_deployment_image(
&self,
namespace: &str,
deployment_name: &str,
container: &str,
new_image: &str,
) -> Result<()> {
let container_name = self
.resolve_container_name(namespace, deployment_name, container)
.await?;
let client = self.client.clone();
let namespace = namespace.to_string();
let deployment_name = deployment_name.to_string();
let new_image = new_image.to_string();
let retry_config = self.retry_config.clone();
with_retry(&retry_config, "K8s update_deployment_image", || {
let client = client.clone();
let namespace = namespace.clone();
let deployment_name = deployment_name.clone();
let container_name = container_name.clone();
let new_image = new_image.clone();
async move {
let deployments: Api<Deployment> =
Api::namespaced(client.as_ref().clone(), &namespace);
let patch = serde_json::json!({
"spec": {
"template": {
"spec": {
"containers": [{
"name": container_name,
"image": new_image
}]
}
}
}
});
let patch_params = PatchParams::apply("apiforge");
deployments
.patch(&deployment_name, &patch_params, &Patch::Strategic(patch))
.await
.map_err(|e| {
K8sRetryableError(K8sError::KubeApi(format!(
"Failed to patch deployment: {}",
e
)))
})?;
Ok::<(), K8sRetryableError>(())
}
})
.await?;
Ok(())
}
async fn resolve_container_name(
&self,
namespace: &str,
deployment_name: &str,
container: &str,
) -> Result<String> {
let deployment = self.get_deployment(namespace, deployment_name).await?;
let containers = deployment
.spec
.as_ref()
.and_then(|s| s.template.spec.as_ref())
.map(|s| &s.containers)
.ok_or_else(|| K8sError::ManifestError("No containers in deployment".to_string()))?;
if let Ok(index) = container.parse::<usize>() {
return containers
.get(index)
.map(|c| c.name.clone())
.ok_or_else(|| {
K8sError::ManifestError(format!("Container index {} not found", index)).into()
});
}
if containers.iter().any(|c| c.name == container) {
Ok(container.to_string())
} else {
Err(K8sError::ManifestError(format!(
"Container '{}' not found. Available containers: {}",
container,
containers
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>()
.join(", ")
))
.into())
}
}
pub async fn get_rollout_status(
&self,
namespace: &str,
deployment_name: &str,
) -> Result<RolloutStatus> {
let deployment = self.get_deployment(namespace, deployment_name).await?;
let spec_replicas = deployment
.spec
.as_ref()
.and_then(|s| s.replicas)
.unwrap_or(1);
let status = deployment.status.as_ref();
let ready_replicas = status.and_then(|s| s.ready_replicas).unwrap_or(0);
let updated_replicas = status.and_then(|s| s.updated_replicas).unwrap_or(0);
let available_replicas = status.and_then(|s| s.available_replicas).unwrap_or(0);
let ready = ready_replicas >= spec_replicas
&& updated_replicas >= spec_replicas
&& available_replicas >= spec_replicas;
let message = if ready {
format!(
"Deployment {} successfully rolled out ({}/{} replicas ready)",
deployment_name, ready_replicas, spec_replicas
)
} else {
format!(
"Rolling out: {}/{} ready, {}/{} updated",
ready_replicas, spec_replicas, updated_replicas, spec_replicas
)
};
Ok(RolloutStatus {
ready,
ready_replicas,
desired_replicas: spec_replicas,
updated_replicas,
available_replicas,
message,
})
}
pub async fn wait_for_rollout<F>(
&self,
namespace: &str,
deployment_name: &str,
timeout_seconds: u64,
on_progress: F,
) -> Result<RolloutStatus>
where
F: Fn(&RolloutStatus),
{
let start = std::time::Instant::now();
let timeout = Duration::from_secs(timeout_seconds);
let poll_interval = Duration::from_secs(2);
loop {
let status = self.get_rollout_status(namespace, deployment_name).await?;
on_progress(&status);
if status.ready {
return Ok(status);
}
if start.elapsed() >= timeout {
return Err(K8sError::RolloutTimeout(timeout_seconds).into());
}
sleep(poll_interval).await;
}
}
pub async fn get_pods_for_deployment(
&self,
namespace: &str,
deployment_name: &str,
) -> Result<Vec<Pod>> {
let deployment = self.get_deployment(namespace, deployment_name).await?;
let match_labels = deployment
.spec
.as_ref()
.and_then(|s| s.selector.match_labels.clone())
.unwrap_or_default();
let label_selector = match_labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",");
let client = self.client.clone();
let namespace = namespace.to_string();
let retry_config = self.retry_config.clone();
let result = with_retry(&retry_config, "K8s get_pods_for_deployment", || {
let client = client.clone();
let namespace = namespace.clone();
let label_selector = label_selector.clone();
async move {
let pods: Api<Pod> = Api::namespaced(client.as_ref().clone(), &namespace);
let list_params = ListParams::default().labels(&label_selector);
let pod_list = pods
.list(&list_params)
.await
.map_err(|e| K8sRetryableError(K8sError::KubeApi(e.to_string())))?;
Ok::<Vec<Pod>, K8sRetryableError>(pod_list.items)
}
})
.await?;
Ok(result)
}
pub async fn restart_deployment(&self, namespace: &str, deployment_name: &str) -> Result<()> {
let client = self.client.clone();
let namespace = namespace.to_string();
let deployment_name = deployment_name.to_string();
let retry_config = self.retry_config.clone();
with_retry(&retry_config, "K8s restart_deployment", || {
let client = client.clone();
let namespace = namespace.clone();
let deployment_name = deployment_name.clone();
async move {
let deployments: Api<Deployment> = Api::namespaced(client.as_ref().clone(), &namespace);
let patch = serde_json::json!({
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt": chrono::Utc::now().to_rfc3339()
}
}
}
}
});
let patch_params = PatchParams::apply("apiforge");
deployments
.patch(&deployment_name, &patch_params, &Patch::Strategic(patch))
.await
.map_err(|e| K8sRetryableError(K8sError::KubeApi(format!("Failed to restart deployment: {}", e))))?;
Ok::<(), K8sRetryableError>(())
}
}).await?;
Ok(())
}
pub async fn scale_deployment(
&self,
namespace: &str,
deployment_name: &str,
replicas: i32,
) -> Result<()> {
let client = self.client.clone();
let namespace = namespace.to_string();
let deployment_name = deployment_name.to_string();
let retry_config = self.retry_config.clone();
with_retry(&retry_config, "K8s scale_deployment", || {
let client = client.clone();
let namespace = namespace.clone();
let deployment_name = deployment_name.clone();
async move {
let deployments: Api<Deployment> =
Api::namespaced(client.as_ref().clone(), &namespace);
let patch = serde_json::json!({
"spec": {
"replicas": replicas
}
});
let patch_params = PatchParams::apply("apiforge");
deployments
.patch(&deployment_name, &patch_params, &Patch::Strategic(patch))
.await
.map_err(|e| {
K8sRetryableError(K8sError::KubeApi(format!(
"Failed to scale deployment: {}",
e
)))
})?;
Ok::<(), K8sRetryableError>(())
}
})
.await?;
Ok(())
}
pub fn context(&self) -> &str {
&self.context
}
pub async fn rollback_deployment(
&self,
namespace: &str,
deployment_name: &str,
revision: Option<i64>,
) -> Result<()> {
let deployment = self.get_deployment(namespace, deployment_name).await?;
let client = self.client.clone();
let namespace_str = namespace.to_string();
let deployment_name_str = deployment_name.to_string();
let retry_config = self.retry_config.clone();
use k8s_openapi::api::apps::v1::ReplicaSet;
let match_labels = deployment
.spec
.as_ref()
.and_then(|s| s.selector.match_labels.clone())
.unwrap_or_default();
let label_selector = match_labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",");
let rs_list = with_retry(&retry_config, "K8s list_replicasets", || {
let client = client.clone();
let namespace_str = namespace_str.clone();
let label_selector = label_selector.clone();
async move {
let replicasets: Api<ReplicaSet> =
Api::namespaced(client.as_ref().clone(), &namespace_str);
let list_params = ListParams::default().labels(&label_selector);
replicasets.list(&list_params).await.map_err(|e| {
K8sRetryableError(K8sError::KubeApi(format!(
"Failed to list ReplicaSets: {}",
e
)))
})
}
})
.await?;
let mut replica_sets: Vec<_> = rs_list.items.into_iter().collect();
replica_sets.sort_by(|a, b| {
let rev_a = a
.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("deployment.kubernetes.io/revision"))
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(0);
let rev_b = b
.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("deployment.kubernetes.io/revision"))
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(0);
rev_b.cmp(&rev_a) });
let target_rs = if let Some(target_rev) = revision {
replica_sets.iter().find(|rs| {
rs.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("deployment.kubernetes.io/revision"))
.and_then(|v| v.parse::<i64>().ok())
== Some(target_rev)
})
} else {
replica_sets.get(1)
};
let target_rs = target_rs.ok_or_else(|| {
K8sError::RolloutFailed("No previous revision found to rollback to".to_string())
})?;
let target_template = target_rs
.spec
.as_ref()
.map(|s| s.template.clone())
.ok_or_else(|| {
K8sError::RolloutFailed("Target ReplicaSet has no template".to_string())
})?;
let patch = serde_json::json!({
"spec": {
"template": target_template
}
});
with_retry(&retry_config, "K8s rollback_deployment", || {
let client = client.clone();
let namespace_str = namespace_str.clone();
let deployment_name_str = deployment_name_str.clone();
let patch = patch.clone();
async move {
let deployments: Api<Deployment> =
Api::namespaced(client.as_ref().clone(), &namespace_str);
let patch_params = PatchParams::apply("apiforge-rollback");
deployments
.patch(
&deployment_name_str,
&patch_params,
&Patch::Strategic(patch),
)
.await
.map_err(|e| {
K8sRetryableError(K8sError::KubeApi(format!(
"Failed to rollback deployment: {}",
e
)))
})?;
Ok::<(), K8sRetryableError>(())
}
})
.await?;
tracing::info!(
"Rolled back deployment {} to previous revision",
deployment_name
);
Ok(())
}
pub async fn get_deployment_revision(
&self,
namespace: &str,
deployment_name: &str,
) -> Result<Option<i64>> {
let deployment = self.get_deployment(namespace, deployment_name).await?;
Ok(deployment
.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("deployment.kubernetes.io/revision"))
.and_then(|v| v.parse::<i64>().ok()))
}
pub async fn list_deployment_revisions(
&self,
namespace: &str,
deployment_name: &str,
) -> Result<Vec<i64>> {
use k8s_openapi::api::apps::v1::ReplicaSet;
let deployment = self.get_deployment(namespace, deployment_name).await?;
let client = self.client.clone();
let namespace = namespace.to_string();
let retry_config = self.retry_config.clone();
let match_labels = deployment
.spec
.as_ref()
.and_then(|s| s.selector.match_labels.clone())
.unwrap_or_default();
let label_selector = match_labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",");
let rs_list = with_retry(&retry_config, "K8s list_deployment_revisions", || {
let client = client.clone();
let namespace = namespace.clone();
let label_selector = label_selector.clone();
async move {
let replicasets: Api<ReplicaSet> =
Api::namespaced(client.as_ref().clone(), &namespace);
let list_params = ListParams::default().labels(&label_selector);
replicasets.list(&list_params).await.map_err(|e| {
K8sRetryableError(K8sError::KubeApi(format!(
"Failed to list ReplicaSets: {}",
e
)))
})
}
})
.await?;
let mut revisions: Vec<i64> = rs_list
.items
.iter()
.filter_map(|rs| {
rs.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("deployment.kubernetes.io/revision"))
.and_then(|v| v.parse::<i64>().ok())
})
.collect();
revisions.sort_by(|a, b| b.cmp(a)); Ok(revisions)
}
}