use futures::TryStreamExt;
use kube_client::{Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use thiserror::Error;
use crate::watcher::{self, watch_object};
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to probe for whether the condition is fulfilled yet: {0}")]
ProbeFailed(#[source] watcher::Error),
}
pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
where
K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
{
let stream = watch_object(api, name).try_skip_while(|obj| {
let matches = cond.matches_object(obj.as_ref());
futures::future::ok(!matches)
});
futures::pin_mut!(stream);
let obj = stream
.try_next()
.await
.map_err(Error::ProbeFailed)?
.expect("stream must not terminate");
Ok(obj)
}
pub trait Condition<K> {
fn matches_object(&self, obj: Option<&K>) -> bool;
fn not(self) -> conditions::Not<Self>
where
Self: Sized,
{
conditions::Not(self)
}
fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
where
Self: Sized,
{
conditions::And(self, other)
}
fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
where
Self: Sized,
{
conditions::Or(self, other)
}
}
impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
fn matches_object(&self, obj: Option<&K>) -> bool {
(self)(obj)
}
}
pub mod conditions {
pub use super::Condition;
use k8s_openapi::{
api::{batch::v1::Job, core::v1::Pod},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
};
use kube_client::Resource;
#[must_use]
pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
move |obj: Option<&K>| {
obj.map_or(
true,
|obj| obj.meta().uid.as_deref() != Some(uid),
)
}
}
#[must_use]
pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
|obj: Option<&CustomResourceDefinition>| {
if let Some(o) = obj {
if let Some(s) = &o.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
return pcond.status == "True";
}
}
}
}
false
}
}
#[must_use]
pub fn is_pod_running() -> impl Condition<Pod> {
|obj: Option<&Pod>| {
if let Some(pod) = &obj {
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
return phase == "Running";
}
}
}
false
}
}
#[must_use]
pub fn is_job_completed() -> impl Condition<Job> {
|obj: Option<&Job>| {
if let Some(job) = &obj {
if let Some(s) = &job.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
return pcond.status == "True";
}
}
}
}
false
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Not<A>(pub(super) A);
impl<A: Condition<K>, K> Condition<K> for Not<A> {
fn matches_object(&self, obj: Option<&K>) -> bool {
!self.0.matches_object(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct And<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Condition<K> for And<A, B>
where
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) && self.1.matches_object(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Or<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Condition<K> for Or<A, B>
where
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) || self.1.matches_object(obj)
}
}
}
pub mod delete {
use super::{await_condition, conditions};
use kube_client::{api::DeleteParams, Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("deleted object has no UID to wait for")]
NoUid,
#[error("failed to delete object: {0}")]
Delete(#[source] kube_client::Error),
#[error("failed to wait for object to be deleted: {0}")]
Await(#[source] super::Error),
}
#[allow(clippy::module_name_repetitions)]
pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
api: Api<K>,
name: &str,
delete_params: &DeleteParams,
) -> Result<(), Error> {
let deleted_obj_uid = api
.delete(name, delete_params)
.await
.map_err(Error::Delete)?
.either(
|mut obj| obj.meta_mut().uid.take(),
|status| status.details.map(|details| details.uid),
)
.ok_or(Error::NoUid)?;
await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
.await
.map_err(Error::Await)?;
Ok(())
}
}