use std::collections::HashSet;
use std::time::Duration;
use k8s_openapi::api::core::v1::Namespace;
use kube::api::{DeleteParams, ListParams, ObjectList, Patch, PatchParams};
use kube::core::ObjectMeta;
use kube::{Api, Client, ResourceExt};
use tracing::{error, info};
use crate::error::Result;
use crate::scope::ApiScope;
use crate::traits::KubeResource;
#[derive(Debug)]
pub enum EnsureOutcome<T> {
Created(T),
Updated(T),
Unchanged(T),
}
impl<T> EnsureOutcome<T> {
pub fn into_resource(self) -> T {
match self {
Self::Created(r) | Self::Updated(r) | Self::Unchanged(r) => r,
}
}
pub fn was_changed(&self) -> bool {
matches!(self, Self::Created(_) | Self::Updated(_))
}
}
pub async fn ensure_namespace(
client: Client,
name: &str,
field_manager: &str,
) -> Result<Namespace> {
let api: Api<Namespace> = Api::all(client);
let ns = Namespace {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
..Default::default()
};
info!(%name, "Ensuring namespace exists");
let params = PatchParams::apply(field_manager).force();
Ok(api.patch(name, ¶ms, &Patch::Apply(&ns)).await?)
}
async fn apply_resource_inner<T>(api: Api<T>, resource: &T, field_manager: &str) -> Result<T>
where
T: KubeResource,
{
let name = resource.meta().name.as_deref().unwrap_or("[unnamed]");
let params = PatchParams::apply(field_manager).force();
Ok(api.patch(name, ¶ms, &Patch::Apply(resource)).await?)
}
async fn delete_resource_inner<T>(api: Api<T>, name: &str) -> Result<bool>
where
T: KubeResource,
{
match api.delete(name, &DeleteParams::default()).await {
Ok(_) => Ok(true),
Err(kube::Error::Api(e)) if e.code == 404 => Ok(false),
Err(e) => Err(e.into()),
}
}
async fn get_resource_inner<T>(api: Api<T>, name: &str) -> Result<Option<T>>
where
T: KubeResource,
{
match api.get(name).await {
Ok(r) => Ok(Some(r)),
Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
Err(e) => Err(e.into()),
}
}
async fn patch_metadata_inner<T>(api: Api<T>, name: &str, patch: serde_json::Value) -> Result<T>
where
T: KubeResource,
{
Ok(api
.patch(name, &PatchParams::default(), &Patch::Merge(&patch))
.await?)
}
async fn list_inner<T>(api: Api<T>, params: ListParams) -> Result<ObjectList<T>>
where
T: KubeResource,
{
Ok(api.list(¶ms).await?)
}
pub async fn apply_resource<T, Scope>(
client: Client,
scope: Scope,
resource: &T,
field_manager: &str,
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let name = resource.meta().name.as_deref().unwrap_or("[unnamed]");
let kind = T::kind(&());
match scope.namespace() {
Some(namespace) => info!(%namespace, %kind, %name, "Applying resource"),
None => info!(%kind, %name, "Applying resource"),
}
apply_resource_inner(scope.into_api(client), resource, field_manager).await
}
pub async fn delete_resource<T, Scope>(client: Client, scope: Scope, name: &str) -> Result<bool>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(namespace) => info!(%namespace, %kind, %name, "Deleting resource"),
None => info!(%kind, %name, "Deleting resource"),
}
delete_resource_inner(scope.into_api(client), name).await
}
pub async fn get_resource<T, Scope>(client: Client, scope: Scope, name: &str) -> Result<Option<T>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(namespace) => info!(%namespace, %kind, %name, "Getting resource"),
None => info!(%kind, %name, "Getting resource"),
}
get_resource_inner(scope.into_api(client), name).await
}
pub async fn exists<T, Scope>(client: Client, scope: Scope, name: &str) -> Result<bool>
where
T: KubeResource,
Scope: ApiScope<T>,
{
Ok(get_resource_inner(scope.into_api(client), name)
.await?
.is_some())
}
pub async fn ensure_resource<T, Scope>(
client: Client,
scope: Scope,
resource: &T,
field_manager: &str,
) -> Result<EnsureOutcome<T>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let name = resource.meta().name.as_deref().unwrap_or("[unnamed]");
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(%ns, %kind, %name, "Ensuring resource"),
None => info!(%kind, %name, "Ensuring resource"),
}
let api = scope.into_api(client);
let live_rv = get_resource_inner(api.clone(), name)
.await?
.and_then(|r| r.meta().resource_version.clone());
let applied = apply_resource_inner(api, resource, field_manager).await?;
let applied_rv = applied.meta().resource_version.clone();
let outcome = match live_rv {
None => {
info!(%kind, %name, "Resource created");
EnsureOutcome::Created(applied)
}
Some(old_rv) if applied_rv.as_deref() != Some(&old_rv) => {
info!(%kind, %name, "Resource updated");
EnsureOutcome::Updated(applied)
}
_ => {
info!(%kind, %name, "Resource unchanged");
EnsureOutcome::Unchanged(applied)
}
};
Ok(outcome)
}
pub async fn list_resources_scoped<T, Scope>(
client: Client,
scope: Scope,
params: ListParams,
) -> Result<ObjectList<T>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
list_inner(scope.into_api(client), params).await
}
pub async fn list_resource_names<T>(client: Client, label_selector: &str) -> Result<HashSet<String>>
where
T: KubeResource,
{
let list = list_inner(
Api::<T>::all(client),
ListParams::default().labels(label_selector),
)
.await?;
Ok(list.items.iter().map(ResourceExt::name_any).collect())
}
pub async fn wait_for_resources<T, Scope>(
client: Client,
scope: Scope,
interval: Duration,
) -> Result<Vec<T>>
where
T: KubeResource,
Scope: ApiScope<T> + Clone,
{
let kind = T::kind(&());
let namespace = scope.namespace();
match namespace {
Some(ns) => info!(namespace = %ns, %kind, "Waiting for at least one resource"),
None => info!(%kind, "Waiting for at least one resource"),
}
loop {
let api: Api<T> = scope.clone().into_api(client.clone());
match api.list(&Default::default()).await {
Ok(list) if !list.items.is_empty() => {
info!(%kind, count = list.items.len(), "Resources found");
return Ok(list.items);
}
Ok(_) => {
info!(%kind, ?interval, "No resources found, retrying");
tokio::time::sleep(interval).await;
}
Err(kube::Error::Api(e)) if e.code == 404 => {
let backoff = interval.min(Duration::from_secs(60));
error!(%kind, code = 404, ?backoff, "CRD not found, retrying");
tokio::time::sleep(backoff).await;
}
Err(e) => {
let backoff = (interval * 2).min(Duration::from_secs(60));
error!(%kind, error = %e, ?backoff, "API error, retrying");
tokio::time::sleep(backoff).await;
}
}
}
}
pub async fn wait_for_condition<T, Scope, F>(
client: Client,
scope: Scope,
name: &str,
interval: Duration,
predicate: F,
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T> + Clone,
F: Fn(&T) -> bool,
{
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(namespace = %ns, %kind, %name, "Waiting for condition"),
None => info!(%kind, %name, "Waiting for condition"),
}
loop {
match get_resource_inner(scope.clone().into_api(client.clone()), name).await {
Ok(Some(r)) if predicate(&r) => return Ok(r),
Ok(Some(_)) => {
info!(%kind, %name, ?interval, "Condition not met, retrying");
tokio::time::sleep(interval).await;
}
Ok(None) => {
info!(%kind, %name, ?interval, "Resource not found, retrying");
tokio::time::sleep(interval).await;
}
Err(e) => {
let backoff = (interval * 2).min(Duration::from_secs(60));
error!(%kind, %name, error = %e, ?backoff, "API error, retrying");
tokio::time::sleep(backoff).await;
}
}
}
}
pub async fn patch_labels<T, Scope>(
client: Client,
scope: Scope,
name: &str,
labels: &[(&str, &str)],
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(namespace = %ns, %kind, %name, "Patching labels"),
None => info!(%kind, %name, "Patching labels"),
}
let map: serde_json::Map<String, serde_json::Value> = labels
.iter()
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
.collect();
let patch = serde_json::json!({ "metadata": { "labels": map } });
patch_metadata_inner(scope.into_api(client), name, patch).await
}
pub async fn patch_annotations<T, Scope>(
client: Client,
scope: Scope,
name: &str,
annotations: &[(&str, &str)],
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(namespace = %ns, %kind, %name, "Patching annotations"),
None => info!(%kind, %name, "Patching annotations"),
}
let map: serde_json::Map<String, serde_json::Value> = annotations
.iter()
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
.collect();
let patch = serde_json::json!({ "metadata": { "annotations": map } });
patch_metadata_inner(scope.into_api(client), name, patch).await
}
pub async fn remove_labels<T, Scope>(
client: Client,
scope: Scope,
name: &str,
keys: &[&str],
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(namespace = %ns, %kind, %name, "Removing labels"),
None => info!(%kind, %name, "Removing labels"),
}
let map: serde_json::Map<String, serde_json::Value> = keys
.iter()
.map(|k| (k.to_string(), serde_json::Value::Null))
.collect();
let patch = serde_json::json!({ "metadata": { "labels": map } });
patch_metadata_inner(scope.into_api(client), name, patch).await
}
pub async fn remove_annotations<T, Scope>(
client: Client,
scope: Scope,
name: &str,
keys: &[&str],
) -> Result<T>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match scope.namespace() {
Some(ns) => info!(namespace = %ns, %kind, %name, "Removing annotations"),
None => info!(%kind, %name, "Removing annotations"),
}
let map: serde_json::Map<String, serde_json::Value> = keys
.iter()
.map(|k| (k.to_string(), serde_json::Value::Null))
.collect();
let patch = serde_json::json!({ "metadata": { "annotations": map } });
patch_metadata_inner(scope.into_api(client), name, patch).await
}