use std::{future, pin::pin};
use futures::TryStreamExt;
use kube_client::{Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use thiserror::Error;
use crate::watcher::{self, watch_object};
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to probe for whether the condition is fulfilled yet: {0}")]
ProbeFailed(#[source] watcher::Error),
}
#[allow(clippy::missing_panics_doc)] pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
where
K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
{
let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
let matches = cond.matches_object(obj.as_ref());
future::ready(Ok(!matches))
}));
let obj = stream
.try_next()
.await
.map_err(Error::ProbeFailed)?
.expect("stream must not terminate");
Ok(obj)
}
pub trait Condition<K> {
fn matches_object(&self, obj: Option<&K>) -> bool;
fn not(self) -> conditions::Not<Self>
where
Self: Sized,
{
conditions::Not(self)
}
fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
where
Self: Sized,
{
conditions::And(self, other)
}
fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
where
Self: Sized,
{
conditions::Or(self, other)
}
}
impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
fn matches_object(&self, obj: Option<&K>) -> bool {
(self)(obj)
}
}
pub mod conditions {
pub use super::Condition;
use k8s_openapi::{
api::{
apps::v1::Deployment,
batch::v1::Job,
core::v1::{Pod, Service},
networking::v1::Ingress,
},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
};
use kube_client::Resource;
#[must_use]
pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
move |obj: Option<&K>| {
obj.map_or(
true,
|obj| obj.meta().uid.as_deref() != Some(uid),
)
}
}
#[must_use]
pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
|obj: Option<&CustomResourceDefinition>| {
if let Some(o) = obj {
if let Some(s) = &o.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
return pcond.status == "True";
}
}
}
}
false
}
}
#[must_use]
pub fn is_pod_running() -> impl Condition<Pod> {
|obj: Option<&Pod>| {
if let Some(pod) = &obj {
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
return phase == "Running";
}
}
}
false
}
}
#[must_use]
pub fn is_job_completed() -> impl Condition<Job> {
|obj: Option<&Job>| {
if let Some(job) = &obj {
if let Some(s) = &job.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
return pcond.status == "True";
}
}
}
}
false
}
}
#[must_use]
pub fn is_deployment_completed() -> impl Condition<Deployment> {
|obj: Option<&Deployment>| {
if let Some(depl) = &obj {
if let Some(s) = &depl.status {
if let Some(conds) = &s.conditions {
if let Some(dcond) = conds.iter().find(|c| {
c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
}) {
return dcond.status == "True";
}
}
}
}
false
}
}
#[must_use]
pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
|obj: Option<&Service>| {
if let Some(svc) = &obj {
if let Some(spec) = &svc.spec {
if spec.type_ != Some("LoadBalancer".to_string()) {
return true;
}
if let Some(s) = &svc.status {
if let Some(lbs) = &s.load_balancer {
if let Some(ings) = &lbs.ingress {
return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
}
}
}
}
}
false
}
}
#[must_use]
pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
|obj: Option<&Ingress>| {
if let Some(ing) = &obj {
if let Some(s) = &ing.status {
if let Some(lbs) = &s.load_balancer {
if let Some(ings) = &lbs.ingress {
return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
}
}
}
}
false
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Not<A>(pub(super) A);
impl<A: Condition<K>, K> Condition<K> for Not<A> {
fn matches_object(&self, obj: Option<&K>) -> bool {
!self.0.matches_object(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct And<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Condition<K> for And<A, B>
where
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) && self.1.matches_object(obj)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Or<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Condition<K> for Or<A, B>
where
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) || self.1.matches_object(obj)
}
}
mod tests {
#[test]
fn crd_established_ok() {
use super::{is_crd_established, Condition};
let crd = r#"
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: testthings.kube.rs
spec:
group: kube.rs
names:
categories: []
kind: TestThing
plural: testthings
shortNames: []
singular: testthing
scope: Namespaced
versions:
- additionalPrinterColumns: []
name: v1
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
served: true
storage: true
status:
acceptedNames:
kind: TestThing
listKind: TestThingList
plural: testthings
singular: testthing
conditions:
- lastTransitionTime: "2025-03-06T03:10:03Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2025-03-06T03:10:03Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "True"
type: Established
storedVersions:
- v1
"#;
let c = serde_yaml::from_str(crd).unwrap();
assert!(is_crd_established().matches_object(Some(&c)))
}
#[test]
fn crd_established_fail() {
use super::{is_crd_established, Condition};
let crd = r#"
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: testthings.kube.rs
spec:
group: kube.rs
names:
categories: []
kind: TestThing
plural: testthings
shortNames: []
singular: testthing
scope: Namespaced
versions:
- additionalPrinterColumns: []
name: v1
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
served: true
storage: true
status:
acceptedNames:
kind: TestThing
listKind: TestThingList
plural: testthings
singular: testthing
conditions:
- lastTransitionTime: "2025-03-06T03:10:03Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2025-03-06T03:10:03Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "False"
type: Established
storedVersions:
- v1
"#;
let c = serde_yaml::from_str(crd).unwrap();
assert!(!is_crd_established().matches_object(Some(&c)))
}
#[test]
fn crd_established_missing() {
use super::{is_crd_established, Condition};
assert!(!is_crd_established().matches_object(None))
}
#[test]
fn pod_running_ok() {
use super::{is_pod_running, Condition};
let pod = r#"
apiVersion: v1
kind: Pod
metadata:
namespace: default
name: testpod
spec:
containers:
- name: testcontainer
image: alpine
command: [ sleep ]
args: [ "100000" ]
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:53:07Z"
status: "True"
type: PodReadyToStartContainers
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:52:58Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:53:24Z"
status: "True"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:53:24Z"
status: "True"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:52:58Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
image: docker.io/library/alpine:latest
imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
lastState: {}
name: testcontainer
ready: true
restartCount: 0
started: true
state:
running:
startedAt: "2025-03-06T03:59:20Z"
phase: Running
qosClass: Burstable
"#;
let p = serde_yaml::from_str(pod).unwrap();
assert!(is_pod_running().matches_object(Some(&p)))
}
#[test]
fn pod_running_unschedulable() {
use super::{is_pod_running, Condition};
let pod = r#"
apiVersion: v1
kind: Pod
metadata:
namespace: default
name: testpod
spec:
containers:
- name: testcontainer
image: alpine
command: [ sleep ]
args: [ "100000" ]
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2025-03-06T03:52:25Z"
message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1
nodes are available: 1 Preemption is not helpful for scheduling.'
reason: Unschedulable
status: "False"
type: PodScheduled
phase: Pending
qosClass: Burstable
"#;
let p = serde_yaml::from_str(pod).unwrap();
assert!(!is_pod_running().matches_object(Some(&p)))
}
#[test]
fn pod_running_missing() {
use super::{is_pod_running, Condition};
assert!(!is_pod_running().matches_object(None))
}
#[test]
fn job_completed_ok() {
use super::{is_job_completed, Condition};
let job = r#"
apiVersion: batch/v1
kind: Job
metadata:
name: pi
namespace: default
spec:
template:
spec:
containers:
- name: pi
command:
- perl
- -Mbignum=bpi
- -wle
- print bpi(2000)
image: perl:5.34.0
imagePullPolicy: IfNotPresent
status:
completionTime: "2025-03-06T05:27:56Z"
conditions:
- lastProbeTime: "2025-03-06T05:27:56Z"
lastTransitionTime: "2025-03-06T05:27:56Z"
message: Reached expected number of succeeded pods
reason: CompletionsReached
status: "True"
type: SuccessCriteriaMet
- lastProbeTime: "2025-03-06T05:27:56Z"
lastTransitionTime: "2025-03-06T05:27:56Z"
message: Reached expected number of succeeded pods
reason: CompletionsReached
status: "True"
type: Complete
ready: 0
startTime: "2025-03-06T05:27:27Z"
succeeded: 1
terminating: 0
uncountedTerminatedPods: {}
"#;
let j = serde_yaml::from_str(job).unwrap();
assert!(is_job_completed().matches_object(Some(&j)))
}
#[test]
fn job_completed_running() {
use super::{is_job_completed, Condition};
let job = r#"
apiVersion: batch/v1
kind: Job
metadata:
name: pi
namespace: default
spec:
backoffLimit: 4
completionMode: NonIndexed
completions: 1
manualSelector: false
parallelism: 1
template:
spec:
containers:
- name: pi
command:
- perl
- -Mbignum=bpi
- -wle
- print bpi(2000)
image: perl:5.34.0
imagePullPolicy: IfNotPresent
status:
active: 1
ready: 0
startTime: "2025-03-06T05:27:27Z"
terminating: 0
uncountedTerminatedPods: {}
"#;
let j = serde_yaml::from_str(job).unwrap();
assert!(!is_job_completed().matches_object(Some(&j)))
}
#[test]
fn job_completed_missing() {
use super::{is_job_completed, Condition};
assert!(!is_job_completed().matches_object(None))
}
#[test]
fn deployment_completed_ok() {
use super::{is_deployment_completed, Condition};
let depl = r#"
apiVersion: apps/v1
kind: Deployment
metadata:
name: testapp
namespace: default
spec:
progressDeadlineSeconds: 600
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app: test
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: test
spec:
containers:
- image: postgres
imagePullPolicy: Always
name: postgres
ports:
- containerPort: 5432
protocol: TCP
env:
- name: POSTGRES_PASSWORD
value: foobar
status:
availableReplicas: 3
conditions:
- lastTransitionTime: "2025-03-06T06:06:57Z"
lastUpdateTime: "2025-03-06T06:06:57Z"
message: Deployment has minimum availability.
reason: MinimumReplicasAvailable
status: "True"
type: Available
- lastTransitionTime: "2025-03-06T06:03:20Z"
lastUpdateTime: "2025-03-06T06:06:57Z"
message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
reason: NewReplicaSetAvailable
status: "True"
type: Progressing
observedGeneration: 2
readyReplicas: 3
replicas: 3
updatedReplicas: 3
"#;
let d = serde_yaml::from_str(depl).unwrap();
assert!(is_deployment_completed().matches_object(Some(&d)))
}
#[test]
fn deployment_completed_pending() {
use super::{is_deployment_completed, Condition};
let depl = r#"
apiVersion: apps/v1
kind: Deployment
metadata:
name: testapp
namespace: default
spec:
progressDeadlineSeconds: 600
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app: test
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: test
spec:
containers:
- image: postgres
imagePullPolicy: Always
name: postgres
ports:
- containerPort: 5432
protocol: TCP
env:
- name: POSTGRES_PASSWORD
value: foobar
status:
conditions:
- lastTransitionTime: "2025-03-06T06:03:20Z"
lastUpdateTime: "2025-03-06T06:03:20Z"
message: Deployment does not have minimum availability.
reason: MinimumReplicasUnavailable
status: "False"
type: Available
- lastTransitionTime: "2025-03-06T06:03:20Z"
lastUpdateTime: "2025-03-06T06:03:20Z"
message: ReplicaSet "testapp-77789cd7d4" is progressing.
reason: ReplicaSetUpdated
status: "True"
type: Progressing
observedGeneration: 1
replicas: 3
unavailableReplicas: 3
updatedReplicas: 3
"#;
let d = serde_yaml::from_str(depl).unwrap();
assert!(!is_deployment_completed().matches_object(Some(&d)))
}
#[test]
fn deployment_completed_missing() {
use super::{is_deployment_completed, Condition};
assert!(!is_deployment_completed().matches_object(None))
}
#[test]
fn service_lb_provisioned_ok_ip() {
use super::{is_service_loadbalancer_provisioned, Condition};
let service = r"
apiVersion: v1
kind: Service
metadata:
name: test
spec:
selector:
app.kubernetes.io/name: test
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 9376
clusterIP: 10.0.171.239
status:
loadBalancer:
ingress:
- ip: 192.0.2.127
";
let s = serde_yaml::from_str(service).unwrap();
assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
}
#[test]
fn service_lb_provisioned_ok_hostname() {
use super::{is_service_loadbalancer_provisioned, Condition};
let service = r"
apiVersion: v1
kind: Service
metadata:
name: test
spec:
selector:
app.kubernetes.io/name: test
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 9376
clusterIP: 10.0.171.239
status:
loadBalancer:
ingress:
- hostname: example.exposed.service
";
let s = serde_yaml::from_str(service).unwrap();
assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
}
#[test]
fn service_lb_provisioned_pending() {
use super::{is_service_loadbalancer_provisioned, Condition};
let service = r"
apiVersion: v1
kind: Service
metadata:
name: test
spec:
selector:
app.kubernetes.io/name: test
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 9376
clusterIP: 10.0.171.239
status:
loadBalancer: {}
";
let s = serde_yaml::from_str(service).unwrap();
assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
}
#[test]
fn service_lb_provisioned_not_loadbalancer() {
use super::{is_service_loadbalancer_provisioned, Condition};
let service = r"
apiVersion: v1
kind: Service
metadata:
name: test
spec:
selector:
app.kubernetes.io/name: test
type: ClusterIP
ports:
- protocol: TCP
port: 80
targetPort: 9376
status:
loadBalancer: {}
";
let s = serde_yaml::from_str(service).unwrap();
assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
}
#[test]
fn service_lb_provisioned_missing() {
use super::{is_service_loadbalancer_provisioned, Condition};
assert!(!is_service_loadbalancer_provisioned().matches_object(None))
}
#[test]
fn ingress_provisioned_ok_ip() {
use super::{is_ingress_provisioned, Condition};
let ingress = r#"
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: test
namespace: default
resourceVersion: "1401"
uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
spec:
ingressClassName: nginx
rules:
- host: httpbin.local
http:
paths:
- path: /
backend:
service:
name: httpbin
port:
number: 80
status:
loadBalancer:
ingress:
- ip: 10.89.7.3
"#;
let i = serde_yaml::from_str(ingress).unwrap();
assert!(is_ingress_provisioned().matches_object(Some(&i)))
}
#[test]
fn ingress_provisioned_ok_hostname() {
use super::{is_ingress_provisioned, Condition};
let ingress = r#"
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: test
namespace: default
resourceVersion: "1401"
uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
spec:
ingressClassName: nginx
rules:
- host: httpbin.local
http:
paths:
- path: /
backend:
service:
name: httpbin
port:
number: 80
status:
loadBalancer:
ingress:
- hostname: example.exposed.service
"#;
let i = serde_yaml::from_str(ingress).unwrap();
assert!(is_ingress_provisioned().matches_object(Some(&i)))
}
#[test]
fn ingress_provisioned_pending() {
use super::{is_ingress_provisioned, Condition};
let ingress = r#"
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: test
namespace: default
resourceVersion: "1401"
uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
spec:
ingressClassName: nginx
rules:
- host: httpbin.local
http:
paths:
- path: /
backend:
service:
name: httpbin
port:
number: 80
status:
loadBalancer: {}
"#;
let i = serde_yaml::from_str(ingress).unwrap();
assert!(!is_ingress_provisioned().matches_object(Some(&i)))
}
#[test]
fn ingress_provisioned_missing() {
use super::{is_ingress_provisioned, Condition};
assert!(!is_ingress_provisioned().matches_object(None))
}
}
}
pub mod delete {
use super::{await_condition, conditions};
use kube_client::{api::DeleteParams, Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("deleted object has no UID to wait for")]
NoUid,
#[error("failed to delete object: {0}")]
Delete(#[source] kube_client::Error),
#[error("failed to wait for object to be deleted: {0}")]
Await(#[source] super::Error),
}
#[allow(clippy::module_name_repetitions)]
pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
api: Api<K>,
name: &str,
delete_params: &DeleteParams,
) -> Result<(), Error> {
let deleted_obj_uid = api
.delete(name, delete_params)
.await
.map_err(Error::Delete)?
.either(
|mut obj| obj.meta_mut().uid.take(),
|status| status.details.map(|details| details.uid),
)
.ok_or(Error::NoUid)?;
await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
.await
.map_err(Error::Await)?;
Ok(())
}
}