kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
13pub enum Error {
14    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
15    ProbeFailed(#[source] watcher::Error),
16}
17
18/// Watch an object, and wait for some condition `cond` to return `true`.
19///
20/// `cond` is passed `Some` if the object is found, otherwise `None`.
21///
22/// The object is returned when the condition is fulfilled.
23///
24/// # Caveats
25///
26/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
27/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
28///
29/// # Errors
30///
31/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
32/// permission to `watch` and `list` it.
33///
34/// Does *not* fail if the object is not found.
35///
36/// # Usage
37///
38/// ```
39/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40/// use kube::{Api, runtime::wait::{await_condition, conditions}};
41/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
42/// # let client: kube::Client = todo!();
43///
44/// let crds: Api<CustomResourceDefinition> = Api::all(client);
45/// // .. create or apply a crd here ..
46/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
47/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
48/// # Ok(())
49/// # }
50/// ```
51#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
52pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
53where
54    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
55{
56    // Skip updates until the condition is satisfied.
57    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
58        let matches = cond.matches_object(obj.as_ref());
59        future::ready(Ok(!matches))
60    }));
61
62    // Then take the first update that satisfies the condition.
63    let obj = stream
64        .try_next()
65        .await
66        .map_err(Error::ProbeFailed)?
67        .expect("stream must not terminate");
68    Ok(obj)
69}
70
71/// A trait for condition functions to be used by [`await_condition`]
72///
73/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
74///
75/// # Usage
76///
77/// ```
78/// use kube::runtime::wait::Condition;
79/// use k8s_openapi::api::core::v1::Pod;
80/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
81///     move |obj: Option<&Pod>| {
82///         if let Some(pod) = &obj
83///             && let Some(status) = &pod.status
84///             && let Some(conds) = &status.conditions
85///             && let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond)
86///         {
87///             return pcond.status == "True";
88///         }
89///         false
90///     }
91/// }
92/// ```
93pub trait Condition<K> {
94    fn matches_object(&self, obj: Option<&K>) -> bool;
95
96    /// Returns a `Condition` that holds if `self` does not
97    ///
98    /// # Usage
99    ///
100    /// ```
101    /// # use kube_runtime::wait::Condition;
102    /// let condition: fn(Option<&()>) -> bool = |_| true;
103    /// assert!(condition.matches_object(None));
104    /// assert!(!condition.not().matches_object(None));
105    /// ```
106    fn not(self) -> conditions::Not<Self>
107    where
108        Self: Sized,
109    {
110        conditions::Not(self)
111    }
112
113    /// Returns a `Condition` that holds if `self` and `other` both do
114    ///
115    /// # Usage
116    ///
117    /// ```
118    /// # use kube_runtime::wait::Condition;
119    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
120    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
121    /// assert!(!cond_false.and(cond_false).matches_object(None));
122    /// assert!(!cond_false.and(cond_true).matches_object(None));
123    /// assert!(!cond_true.and(cond_false).matches_object(None));
124    /// assert!(cond_true.and(cond_true).matches_object(None));
125    /// ```
126    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
127    where
128        Self: Sized,
129    {
130        conditions::And(self, other)
131    }
132
133    /// Returns a `Condition` that holds if either `self` or `other` does
134    ///
135    /// # Usage
136    ///
137    /// ```
138    /// # use kube_runtime::wait::Condition;
139    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
140    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
141    /// assert!(!cond_false.or(cond_false).matches_object(None));
142    /// assert!(cond_false.or(cond_true).matches_object(None));
143    /// assert!(cond_true.or(cond_false).matches_object(None));
144    /// assert!(cond_true.or(cond_true).matches_object(None));
145    /// ```
146    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
147    where
148        Self: Sized,
149    {
150        conditions::Or(self, other)
151    }
152}
153
154impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
155    fn matches_object(&self, obj: Option<&K>) -> bool {
156        (self)(obj)
157    }
158}
159
160/// Common conditions to wait for
161pub mod conditions {
162    pub use super::Condition;
163    use k8s_openapi::{
164        api::{
165            apps::v1::Deployment,
166            batch::v1::Job,
167            core::v1::{Pod, Service},
168            networking::v1::Ingress,
169        },
170        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
171    };
172    use kube_client::Resource;
173
174    /// An await condition that returns `true` once the object has been deleted.
175    ///
176    /// An object is considered to be deleted if the object can no longer be found, or if its
177    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
178    /// the deletion event and the object is recreated in the meantime.
179    #[must_use]
180    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
181        move |obj: Option<&K>| {
182            // NB: Object not found implies success.
183            obj.is_none_or(
184                // Object is found, but a changed uid would mean that it was deleted and recreated
185                |obj| obj.meta().uid.as_deref() != Some(uid),
186            )
187        }
188    }
189
190    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
191    ///
192    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
193    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
194    #[must_use]
195    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
196        |obj: Option<&CustomResourceDefinition>| {
197            if let Some(o) = obj
198                && let Some(s) = &o.status
199                && let Some(conds) = &s.conditions
200                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Established")
201            {
202                return pcond.status == "True";
203            }
204            false
205        }
206    }
207
208    /// An await condition for `Pod` that returns `true` once it is running
209    #[must_use]
210    pub fn is_pod_running() -> impl Condition<Pod> {
211        |obj: Option<&Pod>| {
212            if let Some(pod) = &obj
213                && let Some(status) = &pod.status
214                && let Some(phase) = &status.phase
215            {
216                return phase == "Running";
217            }
218            false
219        }
220    }
221
222    /// An await condition for `Job` that returns `true` once it is completed
223    #[must_use]
224    pub fn is_job_completed() -> impl Condition<Job> {
225        |obj: Option<&Job>| {
226            if let Some(job) = &obj
227                && let Some(s) = &job.status
228                && let Some(conds) = &s.conditions
229                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete")
230            {
231                return pcond.status == "True";
232            }
233            false
234        }
235    }
236
237    /// An await condition for `Deployment` that returns `true` once the latest deployment has completed
238    ///
239    /// This looks for the condition that Kubernetes sets for completed deployments:
240    /// <https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment>
241    #[must_use]
242    pub fn is_deployment_completed() -> impl Condition<Deployment> {
243        |obj: Option<&Deployment>| {
244            if let Some(depl) = &obj
245                && let Some(s) = &depl.status
246                && let Some(conds) = &s.conditions
247                && let Some(dcond) = conds.iter().find(|c| {
248                    c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
249                })
250            {
251                return dcond.status == "True";
252            }
253            false
254        }
255    }
256
257    /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
258    #[must_use]
259    pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
260        |obj: Option<&Service>| {
261            if let Some(svc) = &obj
262                && let Some(spec) = &svc.spec
263            {
264                // ignore services that are not type LoadBalancer (return true immediately)
265                if spec.type_ != Some("LoadBalancer".to_string()) {
266                    return true;
267                }
268                // carry on if this is a LoadBalancer service
269                if let Some(s) = &svc.status
270                    && let Some(lbs) = &s.load_balancer
271                    && let Some(ings) = &lbs.ingress
272                {
273                    return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
274                }
275            }
276            false
277        }
278    }
279
280    /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
281    #[must_use]
282    pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
283        |obj: Option<&Ingress>| {
284            if let Some(ing) = &obj
285                && let Some(s) = &ing.status
286                && let Some(lbs) = &s.load_balancer
287                && let Some(ings) = &lbs.ingress
288            {
289                return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
290            }
291            false
292        }
293    }
294
295    /// See [`Condition::not`]
296    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
297    pub struct Not<A>(pub(super) A);
298    impl<A: Condition<K>, K> Condition<K> for Not<A> {
299        fn matches_object(&self, obj: Option<&K>) -> bool {
300            !self.0.matches_object(obj)
301        }
302    }
303
304    /// See [`Condition::and`]
305    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
306    pub struct And<A, B>(pub(super) A, pub(super) B);
307    impl<A, B, K> Condition<K> for And<A, B>
308    where
309        A: Condition<K>,
310        B: Condition<K>,
311    {
312        fn matches_object(&self, obj: Option<&K>) -> bool {
313            self.0.matches_object(obj) && self.1.matches_object(obj)
314        }
315    }
316
317    /// See [`Condition::or`]
318    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
319    pub struct Or<A, B>(pub(super) A, pub(super) B);
320    impl<A, B, K> Condition<K> for Or<A, B>
321    where
322        A: Condition<K>,
323        B: Condition<K>,
324    {
325        fn matches_object(&self, obj: Option<&K>) -> bool {
326            self.0.matches_object(obj) || self.1.matches_object(obj)
327        }
328    }
329
330    mod tests {
331        #[test]
332        /// pass when CRD is established
333        fn crd_established_ok() {
334            use super::{Condition, is_crd_established};
335
336            let crd = r#"
337                apiVersion: apiextensions.k8s.io/v1
338                kind: CustomResourceDefinition
339                metadata:
340                  name: testthings.kube.rs
341                spec:
342                  group: kube.rs
343                  names:
344                    categories: []
345                    kind: TestThing
346                    plural: testthings
347                    shortNames: []
348                    singular: testthing
349                  scope: Namespaced
350                  versions:
351                    - additionalPrinterColumns: []
352                      name: v1
353                      schema:
354                        openAPIV3Schema:
355                          type: object
356                          x-kubernetes-preserve-unknown-fields: true
357                      served: true
358                      storage: true
359                status:
360                  acceptedNames:
361                    kind: TestThing
362                    listKind: TestThingList
363                    plural: testthings
364                    singular: testthing
365                  conditions:
366                    - lastTransitionTime: "2025-03-06T03:10:03Z"
367                      message: no conflicts found
368                      reason: NoConflicts
369                      status: "True"
370                      type: NamesAccepted
371                    - lastTransitionTime: "2025-03-06T03:10:03Z"
372                      message: the initial names have been accepted
373                      reason: InitialNamesAccepted
374                      status: "True"
375                      type: Established
376                storedVersions:
377                  - v1
378            "#;
379
380            let c = serde_yaml::from_str(crd).unwrap();
381            assert!(is_crd_established().matches_object(Some(&c)))
382        }
383
384        #[test]
385        /// fail when CRD is not yet ready
386        fn crd_established_fail() {
387            use super::{Condition, is_crd_established};
388
389            let crd = r#"
390                apiVersion: apiextensions.k8s.io/v1
391                kind: CustomResourceDefinition
392                metadata:
393                  name: testthings.kube.rs
394                spec:
395                  group: kube.rs
396                  names:
397                    categories: []
398                    kind: TestThing
399                    plural: testthings
400                    shortNames: []
401                    singular: testthing
402                  scope: Namespaced
403                  versions:
404                    - additionalPrinterColumns: []
405                      name: v1
406                      schema:
407                        openAPIV3Schema:
408                          type: object
409                          x-kubernetes-preserve-unknown-fields: true
410                      served: true
411                      storage: true
412                status:
413                  acceptedNames:
414                    kind: TestThing
415                    listKind: TestThingList
416                    plural: testthings
417                    singular: testthing
418                  conditions:
419                    - lastTransitionTime: "2025-03-06T03:10:03Z"
420                      message: no conflicts found
421                      reason: NoConflicts
422                      status: "True"
423                      type: NamesAccepted
424                    - lastTransitionTime: "2025-03-06T03:10:03Z"
425                      message: the initial names have been accepted
426                      reason: InitialNamesAccepted
427                      status: "False"
428                      type: Established
429                storedVersions:
430                  - v1
431            "#;
432
433            let c = serde_yaml::from_str(crd).unwrap();
434            assert!(!is_crd_established().matches_object(Some(&c)))
435        }
436
437        #[test]
438        /// fail when CRD does not exist
439        fn crd_established_missing() {
440            use super::{Condition, is_crd_established};
441
442            assert!(!is_crd_established().matches_object(None))
443        }
444
445        #[test]
446        /// pass when pod is running
447        fn pod_running_ok() {
448            use super::{Condition, is_pod_running};
449
450            let pod = r#"
451                apiVersion: v1
452                kind: Pod
453                metadata:
454                  namespace: default
455                  name: testpod
456                spec:
457                  containers:
458                    - name: testcontainer
459                      image: alpine
460                      command: [ sleep ]
461                      args: [ "100000" ]
462                status:
463                  conditions:
464                    - lastProbeTime: null
465                      lastTransitionTime: "2025-03-06T03:53:07Z"
466                      status: "True"
467                      type: PodReadyToStartContainers
468                    - lastProbeTime: null
469                      lastTransitionTime: "2025-03-06T03:52:58Z"
470                      status: "True"
471                      type: Initialized
472                    - lastProbeTime: null
473                      lastTransitionTime: "2025-03-06T03:53:24Z"
474                      status: "True"
475                      type: Ready
476                    - lastProbeTime: null
477                      lastTransitionTime: "2025-03-06T03:53:24Z"
478                      status: "True"
479                      type: ContainersReady
480                    - lastProbeTime: null
481                      lastTransitionTime: "2025-03-06T03:52:58Z"
482                      status: "True"
483                      type: PodScheduled
484                  containerStatuses:
485                    - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
486                      image: docker.io/library/alpine:latest
487                      imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
488                      lastState: {}
489                      name: testcontainer
490                      ready: true
491                      restartCount: 0
492                      started: true
493                      state:
494                        running:
495                          startedAt: "2025-03-06T03:59:20Z"
496                  phase: Running
497                  qosClass: Burstable
498            "#;
499
500            let p = serde_yaml::from_str(pod).unwrap();
501            assert!(is_pod_running().matches_object(Some(&p)))
502        }
503
504        #[test]
505        /// fail if pod is unschedulable
506        fn pod_running_unschedulable() {
507            use super::{Condition, is_pod_running};
508
509            let pod = r#"
510                apiVersion: v1
511                kind: Pod
512                metadata:
513                  namespace: default
514                  name: testpod
515                spec:
516                  containers:
517                    - name: testcontainer
518                      image: alpine
519                      command: [ sleep ]
520                      args: [ "100000" ]
521                status:
522                  conditions:
523                    - lastProbeTime: null
524                      lastTransitionTime: "2025-03-06T03:52:25Z"
525                      message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1
526                      nodes are available: 1 Preemption is not helpful for scheduling.'
527                      reason: Unschedulable
528                      status: "False"
529                      type: PodScheduled
530                  phase: Pending
531                  qosClass: Burstable
532            "#;
533
534            let p = serde_yaml::from_str(pod).unwrap();
535            assert!(!is_pod_running().matches_object(Some(&p)))
536        }
537
538        #[test]
539        /// fail if pod does not exist
540        fn pod_running_missing() {
541            use super::{Condition, is_pod_running};
542
543            assert!(!is_pod_running().matches_object(None))
544        }
545
546        #[test]
547        /// pass if job completed
548        fn job_completed_ok() {
549            use super::{Condition, is_job_completed};
550
551            let job = r#"
552                apiVersion: batch/v1
553                kind: Job
554                metadata:
555                  name: pi
556                  namespace: default
557                spec:
558                  template:
559                    spec:
560                      containers:
561                      - name: pi
562                        command:
563                        - perl
564                        - -Mbignum=bpi
565                        - -wle
566                        - print bpi(2000)
567                        image: perl:5.34.0
568                        imagePullPolicy: IfNotPresent
569                status:
570                  completionTime: "2025-03-06T05:27:56Z"
571                  conditions:
572                  - lastProbeTime: "2025-03-06T05:27:56Z"
573                    lastTransitionTime: "2025-03-06T05:27:56Z"
574                    message: Reached expected number of succeeded pods
575                    reason: CompletionsReached
576                    status: "True"
577                    type: SuccessCriteriaMet
578                  - lastProbeTime: "2025-03-06T05:27:56Z"
579                    lastTransitionTime: "2025-03-06T05:27:56Z"
580                    message: Reached expected number of succeeded pods
581                    reason: CompletionsReached
582                    status: "True"
583                    type: Complete
584                  ready: 0
585                  startTime: "2025-03-06T05:27:27Z"
586                  succeeded: 1
587                  terminating: 0
588                  uncountedTerminatedPods: {}
589            "#;
590
591            let j = serde_yaml::from_str(job).unwrap();
592            assert!(is_job_completed().matches_object(Some(&j)))
593        }
594
595        #[test]
596        /// fail if job is still in progress
597        fn job_completed_running() {
598            use super::{Condition, is_job_completed};
599
600            let job = r#"
601                apiVersion: batch/v1
602                kind: Job
603                metadata:
604                  name: pi
605                  namespace: default
606                spec:
607                  backoffLimit: 4
608                  completionMode: NonIndexed
609                  completions: 1
610                  manualSelector: false
611                  parallelism: 1
612                  template:
613                    spec:
614                      containers:
615                      - name: pi
616                        command:
617                        - perl
618                        - -Mbignum=bpi
619                        - -wle
620                        - print bpi(2000)
621                        image: perl:5.34.0
622                        imagePullPolicy: IfNotPresent
623                status:
624                  active: 1
625                  ready: 0
626                  startTime: "2025-03-06T05:27:27Z"
627                  terminating: 0
628                  uncountedTerminatedPods: {}
629            "#;
630
631            let j = serde_yaml::from_str(job).unwrap();
632            assert!(!is_job_completed().matches_object(Some(&j)))
633        }
634
635        #[test]
636        /// fail if job does not exist
637        fn job_completed_missing() {
638            use super::{Condition, is_job_completed};
639
640            assert!(!is_job_completed().matches_object(None))
641        }
642
643        #[test]
644        /// pass when deployment has been fully rolled out
645        fn deployment_completed_ok() {
646            use super::{Condition, is_deployment_completed};
647
648            let depl = r#"
649                apiVersion: apps/v1
650                kind: Deployment
651                metadata:
652                  name: testapp
653                  namespace: default
654                spec:
655                  progressDeadlineSeconds: 600
656                  replicas: 3
657                  revisionHistoryLimit: 10
658                  selector:
659                    matchLabels:
660                      app: test
661                  strategy:
662                    rollingUpdate:
663                      maxSurge: 25%
664                      maxUnavailable: 25%
665                    type: RollingUpdate
666                  template:
667                    metadata:
668                      creationTimestamp: null
669                      labels:
670                        app: test
671                    spec:
672                      containers:
673                      - image: postgres
674                        imagePullPolicy: Always
675                        name: postgres
676                        ports:
677                        - containerPort: 5432
678                          protocol: TCP
679                        env:
680                        - name: POSTGRES_PASSWORD
681                          value: foobar
682                status:
683                  availableReplicas: 3
684                  conditions:
685                  - lastTransitionTime: "2025-03-06T06:06:57Z"
686                    lastUpdateTime: "2025-03-06T06:06:57Z"
687                    message: Deployment has minimum availability.
688                    reason: MinimumReplicasAvailable
689                    status: "True"
690                    type: Available
691                  - lastTransitionTime: "2025-03-06T06:03:20Z"
692                    lastUpdateTime: "2025-03-06T06:06:57Z"
693                    message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
694                    reason: NewReplicaSetAvailable
695                    status: "True"
696                    type: Progressing
697                  observedGeneration: 2
698                  readyReplicas: 3
699                  replicas: 3
700                  updatedReplicas: 3
701            "#;
702
703            let d = serde_yaml::from_str(depl).unwrap();
704            assert!(is_deployment_completed().matches_object(Some(&d)))
705        }
706
707        #[test]
708        /// fail if deployment update is still rolling out
709        fn deployment_completed_pending() {
710            use super::{Condition, is_deployment_completed};
711
712            let depl = r#"
713                apiVersion: apps/v1
714                kind: Deployment
715                metadata:
716                  name: testapp
717                  namespace: default
718                spec:
719                  progressDeadlineSeconds: 600
720                  replicas: 3
721                  revisionHistoryLimit: 10
722                  selector:
723                    matchLabels:
724                      app: test
725                  strategy:
726                    rollingUpdate:
727                      maxSurge: 25%
728                      maxUnavailable: 25%
729                    type: RollingUpdate
730                  template:
731                    metadata:
732                      creationTimestamp: null
733                      labels:
734                        app: test
735                    spec:
736                      containers:
737                      - image: postgres
738                        imagePullPolicy: Always
739                        name: postgres
740                        ports:
741                        - containerPort: 5432
742                          protocol: TCP
743                        env:
744                        - name: POSTGRES_PASSWORD
745                          value: foobar
746                status:
747                  conditions:
748                  - lastTransitionTime: "2025-03-06T06:03:20Z"
749                    lastUpdateTime: "2025-03-06T06:03:20Z"
750                    message: Deployment does not have minimum availability.
751                    reason: MinimumReplicasUnavailable
752                    status: "False"
753                    type: Available
754                  - lastTransitionTime: "2025-03-06T06:03:20Z"
755                    lastUpdateTime: "2025-03-06T06:03:20Z"
756                    message: ReplicaSet "testapp-77789cd7d4" is progressing.
757                    reason: ReplicaSetUpdated
758                    status: "True"
759                    type: Progressing
760                  observedGeneration: 1
761                  replicas: 3
762                  unavailableReplicas: 3
763                  updatedReplicas: 3
764            "#;
765
766            let d = serde_yaml::from_str(depl).unwrap();
767            assert!(!is_deployment_completed().matches_object(Some(&d)))
768        }
769
770        #[test]
771        /// fail if deployment does not exist
772        fn deployment_completed_missing() {
773            use super::{Condition, is_deployment_completed};
774
775            assert!(!is_deployment_completed().matches_object(None))
776        }
777
778        #[test]
779        /// pass if loadbalancer service has recieved a loadbalancer IP
780        fn service_lb_provisioned_ok_ip() {
781            use super::{Condition, is_service_loadbalancer_provisioned};
782
783            let service = r"
784                apiVersion: v1
785                kind: Service
786                metadata:
787                  name: test
788                spec:
789                  selector:
790                    app.kubernetes.io/name: test
791                  type: LoadBalancer
792                  ports:
793                    - protocol: TCP
794                      port: 80
795                      targetPort: 9376
796                  clusterIP: 10.0.171.239
797                status:
798                  loadBalancer:
799                    ingress:
800                      - ip: 192.0.2.127
801            ";
802
803            let s = serde_yaml::from_str(service).unwrap();
804            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
805        }
806
807        #[test]
808        /// pass if loadbalancer service has recieved a loadbalancer hostname
809        fn service_lb_provisioned_ok_hostname() {
810            use super::{Condition, is_service_loadbalancer_provisioned};
811
812            let service = r"
813                apiVersion: v1
814                kind: Service
815                metadata:
816                  name: test
817                spec:
818                  selector:
819                    app.kubernetes.io/name: test
820                  type: LoadBalancer
821                  ports:
822                    - protocol: TCP
823                      port: 80
824                      targetPort: 9376
825                  clusterIP: 10.0.171.239
826                status:
827                  loadBalancer:
828                    ingress:
829                      - hostname: example.exposed.service
830            ";
831
832            let s = serde_yaml::from_str(service).unwrap();
833            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
834        }
835
836        #[test]
837        /// fail if loadbalancer service is still waiting for a LB
838        fn service_lb_provisioned_pending() {
839            use super::{Condition, is_service_loadbalancer_provisioned};
840
841            let service = r"
842                apiVersion: v1
843                kind: Service
844                metadata:
845                  name: test
846                spec:
847                  selector:
848                    app.kubernetes.io/name: test
849                  type: LoadBalancer
850                  ports:
851                    - protocol: TCP
852                      port: 80
853                      targetPort: 9376
854                  clusterIP: 10.0.171.239
855                status:
856                  loadBalancer: {}
857            ";
858
859            let s = serde_yaml::from_str(service).unwrap();
860            assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
861        }
862
863        #[test]
864        /// pass if service is not a loadbalancer
865        fn service_lb_provisioned_not_loadbalancer() {
866            use super::{Condition, is_service_loadbalancer_provisioned};
867
868            let service = r"
869                apiVersion: v1
870                kind: Service
871                metadata:
872                  name: test
873                spec:
874                  selector:
875                    app.kubernetes.io/name: test
876                  type: ClusterIP
877                  ports:
878                    - protocol: TCP
879                      port: 80
880                      targetPort: 9376
881                status:
882                  loadBalancer: {}
883            ";
884
885            let s = serde_yaml::from_str(service).unwrap();
886            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
887        }
888
889        #[test]
890        /// fail if service does not exist
891        fn service_lb_provisioned_missing() {
892            use super::{Condition, is_service_loadbalancer_provisioned};
893
894            assert!(!is_service_loadbalancer_provisioned().matches_object(None))
895        }
896
897        #[test]
898        /// pass when ingress has recieved a loadbalancer IP
899        fn ingress_provisioned_ok_ip() {
900            use super::{Condition, is_ingress_provisioned};
901
902            let ingress = r#"
903                apiVersion: networking.k8s.io/v1
904                kind: Ingress
905                metadata:
906                  name: test
907                  namespace: default
908                  resourceVersion: "1401"
909                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
910                spec:
911                  ingressClassName: nginx
912                  rules:
913                  - host: httpbin.local
914                    http:
915                      paths:
916                      - path: /
917                        backend:
918                          service:
919                            name: httpbin
920                            port:
921                              number: 80
922                status:
923                  loadBalancer:
924                    ingress:
925                      - ip: 10.89.7.3
926            "#;
927
928            let i = serde_yaml::from_str(ingress).unwrap();
929            assert!(is_ingress_provisioned().matches_object(Some(&i)))
930        }
931
932        #[test]
933        /// pass when ingress has recieved a loadbalancer hostname
934        fn ingress_provisioned_ok_hostname() {
935            use super::{Condition, is_ingress_provisioned};
936
937            let ingress = r#"
938                apiVersion: networking.k8s.io/v1
939                kind: Ingress
940                metadata:
941                  name: test
942                  namespace: default
943                  resourceVersion: "1401"
944                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
945                spec:
946                  ingressClassName: nginx
947                  rules:
948                  - host: httpbin.local
949                    http:
950                      paths:
951                      - path: /
952                        backend:
953                          service:
954                            name: httpbin
955                            port:
956                              number: 80
957                status:
958                  loadBalancer:
959                    ingress:
960                      - hostname: example.exposed.service
961            "#;
962
963            let i = serde_yaml::from_str(ingress).unwrap();
964            assert!(is_ingress_provisioned().matches_object(Some(&i)))
965        }
966
967        #[test]
968        /// fail if ingress is still waiting for a LB
969        fn ingress_provisioned_pending() {
970            use super::{Condition, is_ingress_provisioned};
971
972            let ingress = r#"
973                apiVersion: networking.k8s.io/v1
974                kind: Ingress
975                metadata:
976                  name: test
977                  namespace: default
978                  resourceVersion: "1401"
979                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
980                spec:
981                  ingressClassName: nginx
982                  rules:
983                  - host: httpbin.local
984                    http:
985                      paths:
986                      - path: /
987                        backend:
988                          service:
989                            name: httpbin
990                            port:
991                              number: 80
992                status:
993                  loadBalancer: {}
994            "#;
995
996            let i = serde_yaml::from_str(ingress).unwrap();
997            assert!(!is_ingress_provisioned().matches_object(Some(&i)))
998        }
999
1000        #[test]
1001        /// fail if ingress does not exist
1002        fn ingress_provisioned_missing() {
1003            use super::{Condition, is_ingress_provisioned};
1004
1005            assert!(!is_ingress_provisioned().matches_object(None))
1006        }
1007    }
1008}
1009
1010/// Utilities for deleting objects
1011pub mod delete {
1012    use super::{await_condition, conditions};
1013    use kube_client::{Api, Resource, api::DeleteParams};
1014    use serde::de::DeserializeOwned;
1015    use std::fmt::Debug;
1016    use thiserror::Error;
1017
1018    #[derive(Debug, Error)]
1019    pub enum Error {
1020        #[error("deleted object has no UID to wait for")]
1021        NoUid,
1022        #[error("failed to delete object: {0}")]
1023        Delete(#[source] kube_client::Error),
1024        #[error("failed to wait for object to be deleted: {0}")]
1025        Await(#[source] super::Error),
1026    }
1027
1028    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
1033    #[allow(clippy::module_name_repetitions)]
1034    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1035        api: Api<K>,
1036        name: &str,
1037        delete_params: &DeleteParams,
1038    ) -> Result<(), Error> {
1039        let deleted_obj_uid = api
1040            .delete(name, delete_params)
1041            .await
1042            .map_err(Error::Delete)?
1043            .either(
1044                |mut obj| obj.meta_mut().uid.take(),
1045                |status| status.details.map(|details| details.uid),
1046            )
1047            .ok_or(Error::NoUid)?;
1048        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1049            .await
1050            .map_err(Error::Await)?;
1051        Ok(())
1052    }
1053}