Skip to main content

kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12/// Errors from `await_condition`
13#[derive(Debug, Error)]
14pub enum Error {
15    /// The underlying watcher failed to probe the stream
16    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
17    ProbeFailed(#[source] watcher::Error),
18}
19
20/// Watch an object, and wait for some condition `cond` to return `true`.
21///
22/// `cond` is passed `Some` if the object is found, otherwise `None`.
23///
24/// The object is returned when the condition is fulfilled.
25///
26/// # Caveats
27///
28/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
29/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
30///
31/// # Errors
32///
33/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
34/// permission to `watch` and `list` it.
35///
36/// Does *not* fail if the object is not found.
37///
38/// # Usage
39///
40/// ```
41/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
42/// use kube::{Api, runtime::wait::{await_condition, conditions}};
43/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
44/// # let client: kube::Client = todo!();
45///
46/// let crds: Api<CustomResourceDefinition> = Api::all(client);
47/// // .. create or apply a crd here ..
48/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
49/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
50/// # Ok(())
51/// # }
52/// ```
53#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
54pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
55where
56    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
57{
58    // Skip updates until the condition is satisfied.
59    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
60        let matches = cond.matches_object(obj.as_ref());
61        future::ready(Ok(!matches))
62    }));
63
64    // Then take the first update that satisfies the condition.
65    let obj = stream
66        .try_next()
67        .await
68        .map_err(Error::ProbeFailed)?
69        .expect("stream must not terminate");
70    Ok(obj)
71}
72
73/// A trait for condition functions to be used by [`await_condition`]
74///
75/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
76///
77/// # Usage
78///
79/// ```
80/// use kube::runtime::wait::Condition;
81/// use k8s_openapi::api::core::v1::Pod;
82/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
83///     move |obj: Option<&Pod>| {
84///         if let Some(pod) = &obj
85///             && let Some(status) = &pod.status
86///             && let Some(conds) = &status.conditions
87///             && let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond)
88///         {
89///             return pcond.status == "True";
90///         }
91///         false
92///     }
93/// }
94/// ```
95pub trait Condition<K> {
96    /// The main trait condition that must be implemented
97    fn matches_object(&self, obj: Option<&K>) -> bool;
98
99    /// Returns a `Condition` that holds if `self` does not
100    ///
101    /// # Usage
102    ///
103    /// ```
104    /// # use kube_runtime::wait::Condition;
105    /// let condition: fn(Option<&()>) -> bool = |_| true;
106    /// assert!(condition.matches_object(None));
107    /// assert!(!condition.not().matches_object(None));
108    /// ```
109    fn not(self) -> conditions::Not<Self>
110    where
111        Self: Sized,
112    {
113        conditions::Not(self)
114    }
115
116    /// Returns a `Condition` that holds if `self` and `other` both do
117    ///
118    /// # Usage
119    ///
120    /// ```
121    /// # use kube_runtime::wait::Condition;
122    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
123    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
124    /// assert!(!cond_false.and(cond_false).matches_object(None));
125    /// assert!(!cond_false.and(cond_true).matches_object(None));
126    /// assert!(!cond_true.and(cond_false).matches_object(None));
127    /// assert!(cond_true.and(cond_true).matches_object(None));
128    /// ```
129    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
130    where
131        Self: Sized,
132    {
133        conditions::And(self, other)
134    }
135
136    /// Returns a `Condition` that holds if either `self` or `other` does
137    ///
138    /// # Usage
139    ///
140    /// ```
141    /// # use kube_runtime::wait::Condition;
142    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
143    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
144    /// assert!(!cond_false.or(cond_false).matches_object(None));
145    /// assert!(cond_false.or(cond_true).matches_object(None));
146    /// assert!(cond_true.or(cond_false).matches_object(None));
147    /// assert!(cond_true.or(cond_true).matches_object(None));
148    /// ```
149    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
150    where
151        Self: Sized,
152    {
153        conditions::Or(self, other)
154    }
155}
156
157impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
158    fn matches_object(&self, obj: Option<&K>) -> bool {
159        (self)(obj)
160    }
161}
162
163/// Common conditions to wait for
164pub mod conditions {
165    pub use super::Condition;
166    use k8s_openapi::{
167        api::{
168            apps::v1::Deployment,
169            batch::v1::Job,
170            core::v1::{Pod, Service},
171            networking::v1::Ingress,
172        },
173        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
174    };
175    use kube_client::Resource;
176
177    /// An await condition that returns `true` once the object has been deleted.
178    ///
179    /// An object is considered to be deleted if the object can no longer be found, or if its
180    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
181    /// the deletion event and the object is recreated in the meantime.
182    #[must_use]
183    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
184        move |obj: Option<&K>| {
185            // NB: Object not found implies success.
186            obj.is_none_or(
187                // Object is found, but a changed uid would mean that it was deleted and recreated
188                |obj| obj.meta().uid.as_deref() != Some(uid),
189            )
190        }
191    }
192
193    /// An await condition that returns `true` once the object has been created.
194    ///
195    /// This is the counterpart to [`is_deleted`](super::conditions::is_deleted).
196    #[must_use]
197    pub fn is_created<K: Resource>() -> impl Condition<K> {
198        |obj: Option<&K>| obj.is_some()
199    }
200
201    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
202    ///
203    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
204    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
205    #[must_use]
206    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
207        |obj: Option<&CustomResourceDefinition>| {
208            if let Some(o) = obj
209                && let Some(s) = &o.status
210                && let Some(conds) = &s.conditions
211                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Established")
212            {
213                return pcond.status == "True";
214            }
215            false
216        }
217    }
218
219    /// An await condition for `Pod` that returns `true` once it is running
220    #[must_use]
221    pub fn is_pod_running() -> impl Condition<Pod> {
222        |obj: Option<&Pod>| {
223            if let Some(pod) = &obj
224                && let Some(status) = &pod.status
225                && let Some(phase) = &status.phase
226            {
227                return phase == "Running";
228            }
229            false
230        }
231    }
232
233    /// An await condition for `Job` that returns `true` once it is completed
234    #[must_use]
235    pub fn is_job_completed() -> impl Condition<Job> {
236        |obj: Option<&Job>| {
237            if let Some(job) = &obj
238                && let Some(s) = &job.status
239                && let Some(conds) = &s.conditions
240                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete")
241            {
242                return pcond.status == "True";
243            }
244            false
245        }
246    }
247
248    /// An await condition for `Deployment` that returns `true` once the latest deployment has completed
249    ///
250    /// This looks for the condition that Kubernetes sets for completed deployments:
251    /// <https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment>
252    #[must_use]
253    pub fn is_deployment_completed() -> impl Condition<Deployment> {
254        |obj: Option<&Deployment>| {
255            if let Some(depl) = &obj
256                && let Some(s) = &depl.status
257                && let Some(conds) = &s.conditions
258                && let Some(dcond) = conds.iter().find(|c| {
259                    c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
260                })
261            {
262                return dcond.status == "True";
263            }
264            false
265        }
266    }
267
268    /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
269    #[must_use]
270    pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
271        |obj: Option<&Service>| {
272            if let Some(svc) = &obj
273                && let Some(spec) = &svc.spec
274            {
275                // ignore services that are not type LoadBalancer (return true immediately)
276                if spec.type_ != Some("LoadBalancer".to_string()) {
277                    return true;
278                }
279                // carry on if this is a LoadBalancer service
280                if let Some(s) = &svc.status
281                    && let Some(lbs) = &s.load_balancer
282                    && let Some(ings) = &lbs.ingress
283                {
284                    return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
285                }
286            }
287            false
288        }
289    }
290
291    /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
292    #[must_use]
293    pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
294        |obj: Option<&Ingress>| {
295            if let Some(ing) = &obj
296                && let Some(s) = &ing.status
297                && let Some(lbs) = &s.load_balancer
298                && let Some(ings) = &lbs.ingress
299            {
300                return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
301            }
302            false
303        }
304    }
305
306    /// See [`Condition::not`]
307    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
308    pub struct Not<A>(pub(super) A);
309    impl<A: Condition<K>, K> Condition<K> for Not<A> {
310        fn matches_object(&self, obj: Option<&K>) -> bool {
311            !self.0.matches_object(obj)
312        }
313    }
314
315    /// See [`Condition::and`]
316    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
317    pub struct And<A, B>(pub(super) A, pub(super) B);
318    impl<A, B, K> Condition<K> for And<A, B>
319    where
320        A: Condition<K>,
321        B: Condition<K>,
322    {
323        fn matches_object(&self, obj: Option<&K>) -> bool {
324            self.0.matches_object(obj) && self.1.matches_object(obj)
325        }
326    }
327
328    /// See [`Condition::or`]
329    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
330    pub struct Or<A, B>(pub(super) A, pub(super) B);
331    impl<A, B, K> Condition<K> for Or<A, B>
332    where
333        A: Condition<K>,
334        B: Condition<K>,
335    {
336        fn matches_object(&self, obj: Option<&K>) -> bool {
337            self.0.matches_object(obj) || self.1.matches_object(obj)
338        }
339    }
340
341    mod tests {
342        #[test]
343        /// pass when CRD is established
344        fn crd_established_ok() {
345            use super::{Condition, is_crd_established};
346
347            let crd = r#"
348                apiVersion: apiextensions.k8s.io/v1
349                kind: CustomResourceDefinition
350                metadata:
351                  name: testthings.kube.rs
352                spec:
353                  group: kube.rs
354                  names:
355                    categories: []
356                    kind: TestThing
357                    plural: testthings
358                    shortNames: []
359                    singular: testthing
360                  scope: Namespaced
361                  versions:
362                    - additionalPrinterColumns: []
363                      name: v1
364                      schema:
365                        openAPIV3Schema:
366                          type: object
367                          x-kubernetes-preserve-unknown-fields: true
368                      served: true
369                      storage: true
370                status:
371                  acceptedNames:
372                    kind: TestThing
373                    listKind: TestThingList
374                    plural: testthings
375                    singular: testthing
376                  conditions:
377                    - lastTransitionTime: "2025-03-06T03:10:03Z"
378                      message: no conflicts found
379                      reason: NoConflicts
380                      status: "True"
381                      type: NamesAccepted
382                    - lastTransitionTime: "2025-03-06T03:10:03Z"
383                      message: the initial names have been accepted
384                      reason: InitialNamesAccepted
385                      status: "True"
386                      type: Established
387                storedVersions:
388                  - v1
389            "#;
390
391            let c = serde_saphyr::from_str(crd).unwrap();
392            assert!(is_crd_established().matches_object(Some(&c)))
393        }
394
395        #[test]
396        /// fail when CRD is not yet ready
397        fn crd_established_fail() {
398            use super::{Condition, is_crd_established};
399
400            let crd = r#"
401                apiVersion: apiextensions.k8s.io/v1
402                kind: CustomResourceDefinition
403                metadata:
404                  name: testthings.kube.rs
405                spec:
406                  group: kube.rs
407                  names:
408                    categories: []
409                    kind: TestThing
410                    plural: testthings
411                    shortNames: []
412                    singular: testthing
413                  scope: Namespaced
414                  versions:
415                    - additionalPrinterColumns: []
416                      name: v1
417                      schema:
418                        openAPIV3Schema:
419                          type: object
420                          x-kubernetes-preserve-unknown-fields: true
421                      served: true
422                      storage: true
423                status:
424                  acceptedNames:
425                    kind: TestThing
426                    listKind: TestThingList
427                    plural: testthings
428                    singular: testthing
429                  conditions:
430                    - lastTransitionTime: "2025-03-06T03:10:03Z"
431                      message: no conflicts found
432                      reason: NoConflicts
433                      status: "True"
434                      type: NamesAccepted
435                    - lastTransitionTime: "2025-03-06T03:10:03Z"
436                      message: the initial names have been accepted
437                      reason: InitialNamesAccepted
438                      status: "False"
439                      type: Established
440                storedVersions:
441                  - v1
442            "#;
443
444            let c = serde_saphyr::from_str(crd).unwrap();
445            assert!(!is_crd_established().matches_object(Some(&c)))
446        }
447
448        #[test]
449        /// fail when CRD does not exist
450        fn crd_established_missing() {
451            use super::{Condition, is_crd_established};
452
453            assert!(!is_crd_established().matches_object(None))
454        }
455
456        #[test]
457        /// pass when pod is running
458        fn pod_running_ok() {
459            use super::{Condition, is_pod_running};
460
461            let pod = r#"
462                apiVersion: v1
463                kind: Pod
464                metadata:
465                  namespace: default
466                  name: testpod
467                spec:
468                  containers:
469                    - name: testcontainer
470                      image: alpine
471                      command: [ sleep ]
472                      args: [ "100000" ]
473                status:
474                  conditions:
475                    - lastProbeTime: null
476                      lastTransitionTime: "2025-03-06T03:53:07Z"
477                      status: "True"
478                      type: PodReadyToStartContainers
479                    - lastProbeTime: null
480                      lastTransitionTime: "2025-03-06T03:52:58Z"
481                      status: "True"
482                      type: Initialized
483                    - lastProbeTime: null
484                      lastTransitionTime: "2025-03-06T03:53:24Z"
485                      status: "True"
486                      type: Ready
487                    - lastProbeTime: null
488                      lastTransitionTime: "2025-03-06T03:53:24Z"
489                      status: "True"
490                      type: ContainersReady
491                    - lastProbeTime: null
492                      lastTransitionTime: "2025-03-06T03:52:58Z"
493                      status: "True"
494                      type: PodScheduled
495                  containerStatuses:
496                    - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
497                      image: docker.io/library/alpine:latest
498                      imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
499                      lastState: {}
500                      name: testcontainer
501                      ready: true
502                      restartCount: 0
503                      started: true
504                      state:
505                        running:
506                          startedAt: "2025-03-06T03:59:20Z"
507                  phase: Running
508                  qosClass: Burstable
509            "#;
510
511            let p = serde_saphyr::from_str(pod).unwrap();
512            assert!(is_pod_running().matches_object(Some(&p)))
513        }
514
515        #[test]
516        /// fail if pod is unschedulable
517        fn pod_running_unschedulable() {
518            use super::{Condition, is_pod_running};
519
520            let pod = r#"
521                apiVersion: v1
522                kind: Pod
523                metadata:
524                  namespace: default
525                  name: testpod
526                spec:
527                  containers:
528                    - name: testcontainer
529                      image: alpine
530                      command: [ sleep ]
531                      args: [ "100000" ]
532                status:
533                  conditions:
534                    - lastProbeTime: null
535                      lastTransitionTime: "2025-03-06T03:52:25Z"
536                      message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling.'
537                      reason: Unschedulable
538                      status: "False"
539                      type: PodScheduled
540                  phase: Pending
541                  qosClass: Burstable
542            "#;
543
544            let p = serde_saphyr::from_str(pod).unwrap();
545            assert!(!is_pod_running().matches_object(Some(&p)))
546        }
547
548        #[test]
549        /// fail if pod does not exist
550        fn pod_running_missing() {
551            use super::{Condition, is_pod_running};
552
553            assert!(!is_pod_running().matches_object(None))
554        }
555
556        #[test]
557        /// pass if job completed
558        fn job_completed_ok() {
559            use super::{Condition, is_job_completed};
560
561            let job = r#"
562                apiVersion: batch/v1
563                kind: Job
564                metadata:
565                  name: pi
566                  namespace: default
567                spec:
568                  template:
569                    spec:
570                      containers:
571                      - name: pi
572                        command:
573                        - perl
574                        - -Mbignum=bpi
575                        - -wle
576                        - print bpi(2000)
577                        image: perl:5.34.0
578                        imagePullPolicy: IfNotPresent
579                status:
580                  completionTime: "2025-03-06T05:27:56Z"
581                  conditions:
582                  - lastProbeTime: "2025-03-06T05:27:56Z"
583                    lastTransitionTime: "2025-03-06T05:27:56Z"
584                    message: Reached expected number of succeeded pods
585                    reason: CompletionsReached
586                    status: "True"
587                    type: SuccessCriteriaMet
588                  - lastProbeTime: "2025-03-06T05:27:56Z"
589                    lastTransitionTime: "2025-03-06T05:27:56Z"
590                    message: Reached expected number of succeeded pods
591                    reason: CompletionsReached
592                    status: "True"
593                    type: Complete
594                  ready: 0
595                  startTime: "2025-03-06T05:27:27Z"
596                  succeeded: 1
597                  terminating: 0
598                  uncountedTerminatedPods: {}
599            "#;
600
601            let j = serde_saphyr::from_str(job).unwrap();
602            assert!(is_job_completed().matches_object(Some(&j)))
603        }
604
605        #[test]
606        /// fail if job is still in progress
607        fn job_completed_running() {
608            use super::{Condition, is_job_completed};
609
610            let job = r#"
611                apiVersion: batch/v1
612                kind: Job
613                metadata:
614                  name: pi
615                  namespace: default
616                spec:
617                  backoffLimit: 4
618                  completionMode: NonIndexed
619                  completions: 1
620                  manualSelector: false
621                  parallelism: 1
622                  template:
623                    spec:
624                      containers:
625                      - name: pi
626                        command:
627                        - perl
628                        - -Mbignum=bpi
629                        - -wle
630                        - print bpi(2000)
631                        image: perl:5.34.0
632                        imagePullPolicy: IfNotPresent
633                status:
634                  active: 1
635                  ready: 0
636                  startTime: "2025-03-06T05:27:27Z"
637                  terminating: 0
638                  uncountedTerminatedPods: {}
639            "#;
640
641            let j = serde_saphyr::from_str(job).unwrap();
642            assert!(!is_job_completed().matches_object(Some(&j)))
643        }
644
645        #[test]
646        /// fail if job does not exist
647        fn job_completed_missing() {
648            use super::{Condition, is_job_completed};
649
650            assert!(!is_job_completed().matches_object(None))
651        }
652
653        #[test]
654        /// pass when deployment has been fully rolled out
655        fn deployment_completed_ok() {
656            use super::{Condition, is_deployment_completed};
657
658            let depl = r#"
659                apiVersion: apps/v1
660                kind: Deployment
661                metadata:
662                  name: testapp
663                  namespace: default
664                spec:
665                  progressDeadlineSeconds: 600
666                  replicas: 3
667                  revisionHistoryLimit: 10
668                  selector:
669                    matchLabels:
670                      app: test
671                  strategy:
672                    rollingUpdate:
673                      maxSurge: 25%
674                      maxUnavailable: 25%
675                    type: RollingUpdate
676                  template:
677                    metadata:
678                      creationTimestamp: null
679                      labels:
680                        app: test
681                    spec:
682                      containers:
683                      - image: postgres
684                        imagePullPolicy: Always
685                        name: postgres
686                        ports:
687                        - containerPort: 5432
688                          protocol: TCP
689                        env:
690                        - name: POSTGRES_PASSWORD
691                          value: foobar
692                status:
693                  availableReplicas: 3
694                  conditions:
695                  - lastTransitionTime: "2025-03-06T06:06:57Z"
696                    lastUpdateTime: "2025-03-06T06:06:57Z"
697                    message: Deployment has minimum availability.
698                    reason: MinimumReplicasAvailable
699                    status: "True"
700                    type: Available
701                  - lastTransitionTime: "2025-03-06T06:03:20Z"
702                    lastUpdateTime: "2025-03-06T06:06:57Z"
703                    message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
704                    reason: NewReplicaSetAvailable
705                    status: "True"
706                    type: Progressing
707                  observedGeneration: 2
708                  readyReplicas: 3
709                  replicas: 3
710                  updatedReplicas: 3
711            "#;
712
713            let d = serde_saphyr::from_str(depl).unwrap();
714            assert!(is_deployment_completed().matches_object(Some(&d)))
715        }
716
717        #[test]
718        /// fail if deployment update is still rolling out
719        fn deployment_completed_pending() {
720            use super::{Condition, is_deployment_completed};
721
722            let depl = r#"
723                apiVersion: apps/v1
724                kind: Deployment
725                metadata:
726                  name: testapp
727                  namespace: default
728                spec:
729                  progressDeadlineSeconds: 600
730                  replicas: 3
731                  revisionHistoryLimit: 10
732                  selector:
733                    matchLabels:
734                      app: test
735                  strategy:
736                    rollingUpdate:
737                      maxSurge: 25%
738                      maxUnavailable: 25%
739                    type: RollingUpdate
740                  template:
741                    metadata:
742                      creationTimestamp: null
743                      labels:
744                        app: test
745                    spec:
746                      containers:
747                      - image: postgres
748                        imagePullPolicy: Always
749                        name: postgres
750                        ports:
751                        - containerPort: 5432
752                          protocol: TCP
753                        env:
754                        - name: POSTGRES_PASSWORD
755                          value: foobar
756                status:
757                  conditions:
758                  - lastTransitionTime: "2025-03-06T06:03:20Z"
759                    lastUpdateTime: "2025-03-06T06:03:20Z"
760                    message: Deployment does not have minimum availability.
761                    reason: MinimumReplicasUnavailable
762                    status: "False"
763                    type: Available
764                  - lastTransitionTime: "2025-03-06T06:03:20Z"
765                    lastUpdateTime: "2025-03-06T06:03:20Z"
766                    message: ReplicaSet "testapp-77789cd7d4" is progressing.
767                    reason: ReplicaSetUpdated
768                    status: "True"
769                    type: Progressing
770                  observedGeneration: 1
771                  replicas: 3
772                  unavailableReplicas: 3
773                  updatedReplicas: 3
774            "#;
775
776            let d = serde_saphyr::from_str(depl).unwrap();
777            assert!(!is_deployment_completed().matches_object(Some(&d)))
778        }
779
780        #[test]
781        /// fail if deployment does not exist
782        fn deployment_completed_missing() {
783            use super::{Condition, is_deployment_completed};
784
785            assert!(!is_deployment_completed().matches_object(None))
786        }
787
788        #[test]
789        /// pass if loadbalancer service has received a loadbalancer IP
790        fn service_lb_provisioned_ok_ip() {
791            use super::{Condition, is_service_loadbalancer_provisioned};
792
793            let service = r"
794                apiVersion: v1
795                kind: Service
796                metadata:
797                  name: test
798                spec:
799                  selector:
800                    app.kubernetes.io/name: test
801                  type: LoadBalancer
802                  ports:
803                    - protocol: TCP
804                      port: 80
805                      targetPort: 9376
806                  clusterIP: 10.0.171.239
807                status:
808                  loadBalancer:
809                    ingress:
810                      - ip: 192.0.2.127
811            ";
812
813            let s = serde_saphyr::from_str(service).unwrap();
814            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
815        }
816
817        #[test]
818        /// pass if loadbalancer service has received a loadbalancer hostname
819        fn service_lb_provisioned_ok_hostname() {
820            use super::{Condition, is_service_loadbalancer_provisioned};
821
822            let service = r"
823                apiVersion: v1
824                kind: Service
825                metadata:
826                  name: test
827                spec:
828                  selector:
829                    app.kubernetes.io/name: test
830                  type: LoadBalancer
831                  ports:
832                    - protocol: TCP
833                      port: 80
834                      targetPort: 9376
835                  clusterIP: 10.0.171.239
836                status:
837                  loadBalancer:
838                    ingress:
839                      - hostname: example.exposed.service
840            ";
841
842            let s = serde_saphyr::from_str(service).unwrap();
843            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
844        }
845
846        #[test]
847        /// fail if loadbalancer service is still waiting for a LB
848        fn service_lb_provisioned_pending() {
849            use super::{Condition, is_service_loadbalancer_provisioned};
850
851            let service = r"
852                apiVersion: v1
853                kind: Service
854                metadata:
855                  name: test
856                spec:
857                  selector:
858                    app.kubernetes.io/name: test
859                  type: LoadBalancer
860                  ports:
861                    - protocol: TCP
862                      port: 80
863                      targetPort: 9376
864                  clusterIP: 10.0.171.239
865                status:
866                  loadBalancer: {}
867            ";
868
869            let s = serde_saphyr::from_str(service).unwrap();
870            assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
871        }
872
873        #[test]
874        /// pass if service is not a loadbalancer
875        fn service_lb_provisioned_not_loadbalancer() {
876            use super::{Condition, is_service_loadbalancer_provisioned};
877
878            let service = r"
879                apiVersion: v1
880                kind: Service
881                metadata:
882                  name: test
883                spec:
884                  selector:
885                    app.kubernetes.io/name: test
886                  type: ClusterIP
887                  ports:
888                    - protocol: TCP
889                      port: 80
890                      targetPort: 9376
891                status:
892                  loadBalancer: {}
893            ";
894
895            let s = serde_saphyr::from_str(service).unwrap();
896            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
897        }
898
899        #[test]
900        /// fail if service does not exist
901        fn service_lb_provisioned_missing() {
902            use super::{Condition, is_service_loadbalancer_provisioned};
903
904            assert!(!is_service_loadbalancer_provisioned().matches_object(None))
905        }
906
907        #[test]
908        /// pass when ingress has received a loadbalancer IP
909        fn ingress_provisioned_ok_ip() {
910            use super::{Condition, is_ingress_provisioned};
911
912            let ingress = r#"
913                apiVersion: networking.k8s.io/v1
914                kind: Ingress
915                metadata:
916                  name: test
917                  namespace: default
918                  resourceVersion: "1401"
919                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
920                spec:
921                  ingressClassName: nginx
922                  rules:
923                  - host: httpbin.local
924                    http:
925                      paths:
926                      - path: /
927                        backend:
928                          service:
929                            name: httpbin
930                            port:
931                              number: 80
932                status:
933                  loadBalancer:
934                    ingress:
935                      - ip: 10.89.7.3
936            "#;
937
938            let i = serde_saphyr::from_str(ingress).unwrap();
939            assert!(is_ingress_provisioned().matches_object(Some(&i)))
940        }
941
942        #[test]
943        /// pass when ingress has received a loadbalancer hostname
944        fn ingress_provisioned_ok_hostname() {
945            use super::{Condition, is_ingress_provisioned};
946
947            let ingress = r#"
948                apiVersion: networking.k8s.io/v1
949                kind: Ingress
950                metadata:
951                  name: test
952                  namespace: default
953                  resourceVersion: "1401"
954                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
955                spec:
956                  ingressClassName: nginx
957                  rules:
958                  - host: httpbin.local
959                    http:
960                      paths:
961                      - path: /
962                        backend:
963                          service:
964                            name: httpbin
965                            port:
966                              number: 80
967                status:
968                  loadBalancer:
969                    ingress:
970                      - hostname: example.exposed.service
971            "#;
972
973            let i = serde_saphyr::from_str(ingress).unwrap();
974            assert!(is_ingress_provisioned().matches_object(Some(&i)))
975        }
976
977        #[test]
978        /// fail if ingress is still waiting for a LB
979        fn ingress_provisioned_pending() {
980            use super::{Condition, is_ingress_provisioned};
981
982            let ingress = r#"
983                apiVersion: networking.k8s.io/v1
984                kind: Ingress
985                metadata:
986                  name: test
987                  namespace: default
988                  resourceVersion: "1401"
989                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
990                spec:
991                  ingressClassName: nginx
992                  rules:
993                  - host: httpbin.local
994                    http:
995                      paths:
996                      - path: /
997                        backend:
998                          service:
999                            name: httpbin
1000                            port:
1001                              number: 80
1002                status:
1003                  loadBalancer: {}
1004            "#;
1005
1006            let i = serde_saphyr::from_str(ingress).unwrap();
1007            assert!(!is_ingress_provisioned().matches_object(Some(&i)))
1008        }
1009
1010        #[test]
1011        /// fail if ingress does not exist
1012        fn ingress_provisioned_missing() {
1013            use super::{Condition, is_ingress_provisioned};
1014
1015            assert!(!is_ingress_provisioned().matches_object(None))
1016        }
1017    }
1018}
1019
1020/// Utilities for deleting objects
1021pub mod delete {
1022    use super::{await_condition, conditions};
1023    use kube_client::{Api, Resource, api::DeleteParams};
1024    use serde::de::DeserializeOwned;
1025    use std::fmt::Debug;
1026    use thiserror::Error;
1027
1028    /// Errors from `delete_and_finalize`
1029    #[derive(Debug, Error)]
1030    pub enum Error {
1031        /// No uid found on metadata.uid on deleted object
1032        #[error("deleted object has no UID to wait for")]
1033        NoUid,
1034
1035        /// Apiserver returned an error to the delete call
1036        #[error("failed to delete object: {0}")]
1037        Delete(#[source] kube_client::Error),
1038
1039        /// A watcher failed to probe the object for the `is_deleted` condition
1040        #[error("failed to wait for object to be deleted: {0}")]
1041        Await(#[source] super::Error),
1042    }
1043
1044    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
1045    ///
1046    /// # Errors
1047    ///
1048    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
1049    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1050        api: Api<K>,
1051        name: &str,
1052        delete_params: &DeleteParams,
1053    ) -> Result<(), Error> {
1054        let deleted_obj_uid = api
1055            .delete(name, delete_params)
1056            .await
1057            .map_err(Error::Delete)?
1058            .either(
1059                |mut obj| obj.meta_mut().uid.take(),
1060                |status| status.details.map(|details| details.uid),
1061            )
1062            .ok_or(Error::NoUid)?;
1063        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1064            .await
1065            .map_err(Error::Await)?;
1066        Ok(())
1067    }
1068}