Skip to main content

kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12/// Errors from `await_condition`
13#[derive(Debug, Error)]
14pub enum Error {
15    /// The underlying watcher failed to probe the stream
16    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
17    ProbeFailed(#[source] watcher::Error),
18}
19
20/// Watch an object, and wait for some condition `cond` to return `true`.
21///
22/// `cond` is passed `Some` if the object is found, otherwise `None`.
23///
24/// The object is returned when the condition is fulfilled.
25///
26/// # Caveats
27///
28/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
29/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
30///
31/// # Errors
32///
33/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
34/// permission to `watch` and `list` it.
35///
36/// Does *not* fail if the object is not found.
37///
38/// # Usage
39///
40/// ```
41/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
42/// use kube::{Api, runtime::wait::{await_condition, conditions}};
43/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
44/// # let client: kube::Client = todo!();
45///
46/// let crds: Api<CustomResourceDefinition> = Api::all(client);
47/// // .. create or apply a crd here ..
48/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
49/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
50/// # Ok(())
51/// # }
52/// ```
53#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
54pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
55where
56    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
57{
58    // Skip updates until the condition is satisfied.
59    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
60        let matches = cond.matches_object(obj.as_ref());
61        future::ready(Ok(!matches))
62    }));
63
64    // Then take the first update that satisfies the condition.
65    let obj = stream
66        .try_next()
67        .await
68        .map_err(Error::ProbeFailed)?
69        .expect("stream must not terminate");
70    Ok(obj)
71}
72
73/// A trait for condition functions to be used by [`await_condition`]
74///
75/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
76///
77/// # Usage
78///
79/// ```
80/// use kube::runtime::wait::Condition;
81/// use k8s_openapi::api::core::v1::Pod;
82/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
83///     move |obj: Option<&Pod>| {
84///         if let Some(pod) = &obj
85///             && let Some(status) = &pod.status
86///             && let Some(conds) = &status.conditions
87///             && let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond)
88///         {
89///             return pcond.status == "True";
90///         }
91///         false
92///     }
93/// }
94/// ```
95pub trait Condition<K> {
96    /// The main trait condition that must be implemented
97    fn matches_object(&self, obj: Option<&K>) -> bool;
98
99    /// Returns a `Condition` that holds if `self` does not
100    ///
101    /// # Usage
102    ///
103    /// ```
104    /// # use kube_runtime::wait::Condition;
105    /// let condition: fn(Option<&()>) -> bool = |_| true;
106    /// assert!(condition.matches_object(None));
107    /// assert!(!condition.not().matches_object(None));
108    /// ```
109    fn not(self) -> conditions::Not<Self>
110    where
111        Self: Sized,
112    {
113        conditions::Not(self)
114    }
115
116    /// Returns a `Condition` that holds if `self` and `other` both do
117    ///
118    /// # Usage
119    ///
120    /// ```
121    /// # use kube_runtime::wait::Condition;
122    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
123    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
124    /// assert!(!cond_false.and(cond_false).matches_object(None));
125    /// assert!(!cond_false.and(cond_true).matches_object(None));
126    /// assert!(!cond_true.and(cond_false).matches_object(None));
127    /// assert!(cond_true.and(cond_true).matches_object(None));
128    /// ```
129    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
130    where
131        Self: Sized,
132    {
133        conditions::And(self, other)
134    }
135
136    /// Returns a `Condition` that holds if either `self` or `other` does
137    ///
138    /// # Usage
139    ///
140    /// ```
141    /// # use kube_runtime::wait::Condition;
142    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
143    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
144    /// assert!(!cond_false.or(cond_false).matches_object(None));
145    /// assert!(cond_false.or(cond_true).matches_object(None));
146    /// assert!(cond_true.or(cond_false).matches_object(None));
147    /// assert!(cond_true.or(cond_true).matches_object(None));
148    /// ```
149    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
150    where
151        Self: Sized,
152    {
153        conditions::Or(self, other)
154    }
155}
156
157impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
158    fn matches_object(&self, obj: Option<&K>) -> bool {
159        (self)(obj)
160    }
161}
162
163/// Common conditions to wait for
164pub mod conditions {
165    pub use super::Condition;
166    use k8s_openapi::{
167        api::{
168            apps::v1::Deployment,
169            batch::v1::Job,
170            core::v1::{Pod, Service},
171            networking::v1::Ingress,
172        },
173        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
174    };
175    use kube_client::Resource;
176
177    /// An await condition that returns `true` once the object has been deleted.
178    ///
179    /// An object is considered to be deleted if the object can no longer be found, or if its
180    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
181    /// the deletion event and the object is recreated in the meantime.
182    #[must_use]
183    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
184        move |obj: Option<&K>| {
185            // NB: Object not found implies success.
186            obj.is_none_or(
187                // Object is found, but a changed uid would mean that it was deleted and recreated
188                |obj| obj.meta().uid.as_deref() != Some(uid),
189            )
190        }
191    }
192
193    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
194    ///
195    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
196    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
197    #[must_use]
198    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
199        |obj: Option<&CustomResourceDefinition>| {
200            if let Some(o) = obj
201                && let Some(s) = &o.status
202                && let Some(conds) = &s.conditions
203                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Established")
204            {
205                return pcond.status == "True";
206            }
207            false
208        }
209    }
210
211    /// An await condition for `Pod` that returns `true` once it is running
212    #[must_use]
213    pub fn is_pod_running() -> impl Condition<Pod> {
214        |obj: Option<&Pod>| {
215            if let Some(pod) = &obj
216                && let Some(status) = &pod.status
217                && let Some(phase) = &status.phase
218            {
219                return phase == "Running";
220            }
221            false
222        }
223    }
224
225    /// An await condition for `Job` that returns `true` once it is completed
226    #[must_use]
227    pub fn is_job_completed() -> impl Condition<Job> {
228        |obj: Option<&Job>| {
229            if let Some(job) = &obj
230                && let Some(s) = &job.status
231                && let Some(conds) = &s.conditions
232                && let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete")
233            {
234                return pcond.status == "True";
235            }
236            false
237        }
238    }
239
240    /// An await condition for `Deployment` that returns `true` once the latest deployment has completed
241    ///
242    /// This looks for the condition that Kubernetes sets for completed deployments:
243    /// <https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment>
244    #[must_use]
245    pub fn is_deployment_completed() -> impl Condition<Deployment> {
246        |obj: Option<&Deployment>| {
247            if let Some(depl) = &obj
248                && let Some(s) = &depl.status
249                && let Some(conds) = &s.conditions
250                && let Some(dcond) = conds.iter().find(|c| {
251                    c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
252                })
253            {
254                return dcond.status == "True";
255            }
256            false
257        }
258    }
259
260    /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
261    #[must_use]
262    pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
263        |obj: Option<&Service>| {
264            if let Some(svc) = &obj
265                && let Some(spec) = &svc.spec
266            {
267                // ignore services that are not type LoadBalancer (return true immediately)
268                if spec.type_ != Some("LoadBalancer".to_string()) {
269                    return true;
270                }
271                // carry on if this is a LoadBalancer service
272                if let Some(s) = &svc.status
273                    && let Some(lbs) = &s.load_balancer
274                    && let Some(ings) = &lbs.ingress
275                {
276                    return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
277                }
278            }
279            false
280        }
281    }
282
283    /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
284    #[must_use]
285    pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
286        |obj: Option<&Ingress>| {
287            if let Some(ing) = &obj
288                && let Some(s) = &ing.status
289                && let Some(lbs) = &s.load_balancer
290                && let Some(ings) = &lbs.ingress
291            {
292                return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
293            }
294            false
295        }
296    }
297
298    /// See [`Condition::not`]
299    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
300    pub struct Not<A>(pub(super) A);
301    impl<A: Condition<K>, K> Condition<K> for Not<A> {
302        fn matches_object(&self, obj: Option<&K>) -> bool {
303            !self.0.matches_object(obj)
304        }
305    }
306
307    /// See [`Condition::and`]
308    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
309    pub struct And<A, B>(pub(super) A, pub(super) B);
310    impl<A, B, K> Condition<K> for And<A, B>
311    where
312        A: Condition<K>,
313        B: Condition<K>,
314    {
315        fn matches_object(&self, obj: Option<&K>) -> bool {
316            self.0.matches_object(obj) && self.1.matches_object(obj)
317        }
318    }
319
320    /// See [`Condition::or`]
321    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
322    pub struct Or<A, B>(pub(super) A, pub(super) B);
323    impl<A, B, K> Condition<K> for Or<A, B>
324    where
325        A: Condition<K>,
326        B: Condition<K>,
327    {
328        fn matches_object(&self, obj: Option<&K>) -> bool {
329            self.0.matches_object(obj) || self.1.matches_object(obj)
330        }
331    }
332
333    mod tests {
334        #[test]
335        /// pass when CRD is established
336        fn crd_established_ok() {
337            use super::{Condition, is_crd_established};
338
339            let crd = r#"
340                apiVersion: apiextensions.k8s.io/v1
341                kind: CustomResourceDefinition
342                metadata:
343                  name: testthings.kube.rs
344                spec:
345                  group: kube.rs
346                  names:
347                    categories: []
348                    kind: TestThing
349                    plural: testthings
350                    shortNames: []
351                    singular: testthing
352                  scope: Namespaced
353                  versions:
354                    - additionalPrinterColumns: []
355                      name: v1
356                      schema:
357                        openAPIV3Schema:
358                          type: object
359                          x-kubernetes-preserve-unknown-fields: true
360                      served: true
361                      storage: true
362                status:
363                  acceptedNames:
364                    kind: TestThing
365                    listKind: TestThingList
366                    plural: testthings
367                    singular: testthing
368                  conditions:
369                    - lastTransitionTime: "2025-03-06T03:10:03Z"
370                      message: no conflicts found
371                      reason: NoConflicts
372                      status: "True"
373                      type: NamesAccepted
374                    - lastTransitionTime: "2025-03-06T03:10:03Z"
375                      message: the initial names have been accepted
376                      reason: InitialNamesAccepted
377                      status: "True"
378                      type: Established
379                storedVersions:
380                  - v1
381            "#;
382
383            let c = serde_yaml::from_str(crd).unwrap();
384            assert!(is_crd_established().matches_object(Some(&c)))
385        }
386
387        #[test]
388        /// fail when CRD is not yet ready
389        fn crd_established_fail() {
390            use super::{Condition, is_crd_established};
391
392            let crd = r#"
393                apiVersion: apiextensions.k8s.io/v1
394                kind: CustomResourceDefinition
395                metadata:
396                  name: testthings.kube.rs
397                spec:
398                  group: kube.rs
399                  names:
400                    categories: []
401                    kind: TestThing
402                    plural: testthings
403                    shortNames: []
404                    singular: testthing
405                  scope: Namespaced
406                  versions:
407                    - additionalPrinterColumns: []
408                      name: v1
409                      schema:
410                        openAPIV3Schema:
411                          type: object
412                          x-kubernetes-preserve-unknown-fields: true
413                      served: true
414                      storage: true
415                status:
416                  acceptedNames:
417                    kind: TestThing
418                    listKind: TestThingList
419                    plural: testthings
420                    singular: testthing
421                  conditions:
422                    - lastTransitionTime: "2025-03-06T03:10:03Z"
423                      message: no conflicts found
424                      reason: NoConflicts
425                      status: "True"
426                      type: NamesAccepted
427                    - lastTransitionTime: "2025-03-06T03:10:03Z"
428                      message: the initial names have been accepted
429                      reason: InitialNamesAccepted
430                      status: "False"
431                      type: Established
432                storedVersions:
433                  - v1
434            "#;
435
436            let c = serde_yaml::from_str(crd).unwrap();
437            assert!(!is_crd_established().matches_object(Some(&c)))
438        }
439
440        #[test]
441        /// fail when CRD does not exist
442        fn crd_established_missing() {
443            use super::{Condition, is_crd_established};
444
445            assert!(!is_crd_established().matches_object(None))
446        }
447
448        #[test]
449        /// pass when pod is running
450        fn pod_running_ok() {
451            use super::{Condition, is_pod_running};
452
453            let pod = r#"
454                apiVersion: v1
455                kind: Pod
456                metadata:
457                  namespace: default
458                  name: testpod
459                spec:
460                  containers:
461                    - name: testcontainer
462                      image: alpine
463                      command: [ sleep ]
464                      args: [ "100000" ]
465                status:
466                  conditions:
467                    - lastProbeTime: null
468                      lastTransitionTime: "2025-03-06T03:53:07Z"
469                      status: "True"
470                      type: PodReadyToStartContainers
471                    - lastProbeTime: null
472                      lastTransitionTime: "2025-03-06T03:52:58Z"
473                      status: "True"
474                      type: Initialized
475                    - lastProbeTime: null
476                      lastTransitionTime: "2025-03-06T03:53:24Z"
477                      status: "True"
478                      type: Ready
479                    - lastProbeTime: null
480                      lastTransitionTime: "2025-03-06T03:53:24Z"
481                      status: "True"
482                      type: ContainersReady
483                    - lastProbeTime: null
484                      lastTransitionTime: "2025-03-06T03:52:58Z"
485                      status: "True"
486                      type: PodScheduled
487                  containerStatuses:
488                    - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
489                      image: docker.io/library/alpine:latest
490                      imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
491                      lastState: {}
492                      name: testcontainer
493                      ready: true
494                      restartCount: 0
495                      started: true
496                      state:
497                        running:
498                          startedAt: "2025-03-06T03:59:20Z"
499                  phase: Running
500                  qosClass: Burstable
501            "#;
502
503            let p = serde_yaml::from_str(pod).unwrap();
504            assert!(is_pod_running().matches_object(Some(&p)))
505        }
506
507        #[test]
508        /// fail if pod is unschedulable
509        fn pod_running_unschedulable() {
510            use super::{Condition, is_pod_running};
511
512            let pod = r#"
513                apiVersion: v1
514                kind: Pod
515                metadata:
516                  namespace: default
517                  name: testpod
518                spec:
519                  containers:
520                    - name: testcontainer
521                      image: alpine
522                      command: [ sleep ]
523                      args: [ "100000" ]
524                status:
525                  conditions:
526                    - lastProbeTime: null
527                      lastTransitionTime: "2025-03-06T03:52:25Z"
528                      message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1
529                      nodes are available: 1 Preemption is not helpful for scheduling.'
530                      reason: Unschedulable
531                      status: "False"
532                      type: PodScheduled
533                  phase: Pending
534                  qosClass: Burstable
535            "#;
536
537            let p = serde_yaml::from_str(pod).unwrap();
538            assert!(!is_pod_running().matches_object(Some(&p)))
539        }
540
541        #[test]
542        /// fail if pod does not exist
543        fn pod_running_missing() {
544            use super::{Condition, is_pod_running};
545
546            assert!(!is_pod_running().matches_object(None))
547        }
548
549        #[test]
550        /// pass if job completed
551        fn job_completed_ok() {
552            use super::{Condition, is_job_completed};
553
554            let job = r#"
555                apiVersion: batch/v1
556                kind: Job
557                metadata:
558                  name: pi
559                  namespace: default
560                spec:
561                  template:
562                    spec:
563                      containers:
564                      - name: pi
565                        command:
566                        - perl
567                        - -Mbignum=bpi
568                        - -wle
569                        - print bpi(2000)
570                        image: perl:5.34.0
571                        imagePullPolicy: IfNotPresent
572                status:
573                  completionTime: "2025-03-06T05:27:56Z"
574                  conditions:
575                  - lastProbeTime: "2025-03-06T05:27:56Z"
576                    lastTransitionTime: "2025-03-06T05:27:56Z"
577                    message: Reached expected number of succeeded pods
578                    reason: CompletionsReached
579                    status: "True"
580                    type: SuccessCriteriaMet
581                  - lastProbeTime: "2025-03-06T05:27:56Z"
582                    lastTransitionTime: "2025-03-06T05:27:56Z"
583                    message: Reached expected number of succeeded pods
584                    reason: CompletionsReached
585                    status: "True"
586                    type: Complete
587                  ready: 0
588                  startTime: "2025-03-06T05:27:27Z"
589                  succeeded: 1
590                  terminating: 0
591                  uncountedTerminatedPods: {}
592            "#;
593
594            let j = serde_yaml::from_str(job).unwrap();
595            assert!(is_job_completed().matches_object(Some(&j)))
596        }
597
598        #[test]
599        /// fail if job is still in progress
600        fn job_completed_running() {
601            use super::{Condition, is_job_completed};
602
603            let job = r#"
604                apiVersion: batch/v1
605                kind: Job
606                metadata:
607                  name: pi
608                  namespace: default
609                spec:
610                  backoffLimit: 4
611                  completionMode: NonIndexed
612                  completions: 1
613                  manualSelector: false
614                  parallelism: 1
615                  template:
616                    spec:
617                      containers:
618                      - name: pi
619                        command:
620                        - perl
621                        - -Mbignum=bpi
622                        - -wle
623                        - print bpi(2000)
624                        image: perl:5.34.0
625                        imagePullPolicy: IfNotPresent
626                status:
627                  active: 1
628                  ready: 0
629                  startTime: "2025-03-06T05:27:27Z"
630                  terminating: 0
631                  uncountedTerminatedPods: {}
632            "#;
633
634            let j = serde_yaml::from_str(job).unwrap();
635            assert!(!is_job_completed().matches_object(Some(&j)))
636        }
637
638        #[test]
639        /// fail if job does not exist
640        fn job_completed_missing() {
641            use super::{Condition, is_job_completed};
642
643            assert!(!is_job_completed().matches_object(None))
644        }
645
646        #[test]
647        /// pass when deployment has been fully rolled out
648        fn deployment_completed_ok() {
649            use super::{Condition, is_deployment_completed};
650
651            let depl = r#"
652                apiVersion: apps/v1
653                kind: Deployment
654                metadata:
655                  name: testapp
656                  namespace: default
657                spec:
658                  progressDeadlineSeconds: 600
659                  replicas: 3
660                  revisionHistoryLimit: 10
661                  selector:
662                    matchLabels:
663                      app: test
664                  strategy:
665                    rollingUpdate:
666                      maxSurge: 25%
667                      maxUnavailable: 25%
668                    type: RollingUpdate
669                  template:
670                    metadata:
671                      creationTimestamp: null
672                      labels:
673                        app: test
674                    spec:
675                      containers:
676                      - image: postgres
677                        imagePullPolicy: Always
678                        name: postgres
679                        ports:
680                        - containerPort: 5432
681                          protocol: TCP
682                        env:
683                        - name: POSTGRES_PASSWORD
684                          value: foobar
685                status:
686                  availableReplicas: 3
687                  conditions:
688                  - lastTransitionTime: "2025-03-06T06:06:57Z"
689                    lastUpdateTime: "2025-03-06T06:06:57Z"
690                    message: Deployment has minimum availability.
691                    reason: MinimumReplicasAvailable
692                    status: "True"
693                    type: Available
694                  - lastTransitionTime: "2025-03-06T06:03:20Z"
695                    lastUpdateTime: "2025-03-06T06:06:57Z"
696                    message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
697                    reason: NewReplicaSetAvailable
698                    status: "True"
699                    type: Progressing
700                  observedGeneration: 2
701                  readyReplicas: 3
702                  replicas: 3
703                  updatedReplicas: 3
704            "#;
705
706            let d = serde_yaml::from_str(depl).unwrap();
707            assert!(is_deployment_completed().matches_object(Some(&d)))
708        }
709
710        #[test]
711        /// fail if deployment update is still rolling out
712        fn deployment_completed_pending() {
713            use super::{Condition, is_deployment_completed};
714
715            let depl = r#"
716                apiVersion: apps/v1
717                kind: Deployment
718                metadata:
719                  name: testapp
720                  namespace: default
721                spec:
722                  progressDeadlineSeconds: 600
723                  replicas: 3
724                  revisionHistoryLimit: 10
725                  selector:
726                    matchLabels:
727                      app: test
728                  strategy:
729                    rollingUpdate:
730                      maxSurge: 25%
731                      maxUnavailable: 25%
732                    type: RollingUpdate
733                  template:
734                    metadata:
735                      creationTimestamp: null
736                      labels:
737                        app: test
738                    spec:
739                      containers:
740                      - image: postgres
741                        imagePullPolicy: Always
742                        name: postgres
743                        ports:
744                        - containerPort: 5432
745                          protocol: TCP
746                        env:
747                        - name: POSTGRES_PASSWORD
748                          value: foobar
749                status:
750                  conditions:
751                  - lastTransitionTime: "2025-03-06T06:03:20Z"
752                    lastUpdateTime: "2025-03-06T06:03:20Z"
753                    message: Deployment does not have minimum availability.
754                    reason: MinimumReplicasUnavailable
755                    status: "False"
756                    type: Available
757                  - lastTransitionTime: "2025-03-06T06:03:20Z"
758                    lastUpdateTime: "2025-03-06T06:03:20Z"
759                    message: ReplicaSet "testapp-77789cd7d4" is progressing.
760                    reason: ReplicaSetUpdated
761                    status: "True"
762                    type: Progressing
763                  observedGeneration: 1
764                  replicas: 3
765                  unavailableReplicas: 3
766                  updatedReplicas: 3
767            "#;
768
769            let d = serde_yaml::from_str(depl).unwrap();
770            assert!(!is_deployment_completed().matches_object(Some(&d)))
771        }
772
773        #[test]
774        /// fail if deployment does not exist
775        fn deployment_completed_missing() {
776            use super::{Condition, is_deployment_completed};
777
778            assert!(!is_deployment_completed().matches_object(None))
779        }
780
781        #[test]
782        /// pass if loadbalancer service has received a loadbalancer IP
783        fn service_lb_provisioned_ok_ip() {
784            use super::{Condition, is_service_loadbalancer_provisioned};
785
786            let service = r"
787                apiVersion: v1
788                kind: Service
789                metadata:
790                  name: test
791                spec:
792                  selector:
793                    app.kubernetes.io/name: test
794                  type: LoadBalancer
795                  ports:
796                    - protocol: TCP
797                      port: 80
798                      targetPort: 9376
799                  clusterIP: 10.0.171.239
800                status:
801                  loadBalancer:
802                    ingress:
803                      - ip: 192.0.2.127
804            ";
805
806            let s = serde_yaml::from_str(service).unwrap();
807            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
808        }
809
810        #[test]
811        /// pass if loadbalancer service has received a loadbalancer hostname
812        fn service_lb_provisioned_ok_hostname() {
813            use super::{Condition, is_service_loadbalancer_provisioned};
814
815            let service = r"
816                apiVersion: v1
817                kind: Service
818                metadata:
819                  name: test
820                spec:
821                  selector:
822                    app.kubernetes.io/name: test
823                  type: LoadBalancer
824                  ports:
825                    - protocol: TCP
826                      port: 80
827                      targetPort: 9376
828                  clusterIP: 10.0.171.239
829                status:
830                  loadBalancer:
831                    ingress:
832                      - hostname: example.exposed.service
833            ";
834
835            let s = serde_yaml::from_str(service).unwrap();
836            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
837        }
838
839        #[test]
840        /// fail if loadbalancer service is still waiting for a LB
841        fn service_lb_provisioned_pending() {
842            use super::{Condition, is_service_loadbalancer_provisioned};
843
844            let service = r"
845                apiVersion: v1
846                kind: Service
847                metadata:
848                  name: test
849                spec:
850                  selector:
851                    app.kubernetes.io/name: test
852                  type: LoadBalancer
853                  ports:
854                    - protocol: TCP
855                      port: 80
856                      targetPort: 9376
857                  clusterIP: 10.0.171.239
858                status:
859                  loadBalancer: {}
860            ";
861
862            let s = serde_yaml::from_str(service).unwrap();
863            assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
864        }
865
866        #[test]
867        /// pass if service is not a loadbalancer
868        fn service_lb_provisioned_not_loadbalancer() {
869            use super::{Condition, is_service_loadbalancer_provisioned};
870
871            let service = r"
872                apiVersion: v1
873                kind: Service
874                metadata:
875                  name: test
876                spec:
877                  selector:
878                    app.kubernetes.io/name: test
879                  type: ClusterIP
880                  ports:
881                    - protocol: TCP
882                      port: 80
883                      targetPort: 9376
884                status:
885                  loadBalancer: {}
886            ";
887
888            let s = serde_yaml::from_str(service).unwrap();
889            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
890        }
891
892        #[test]
893        /// fail if service does not exist
894        fn service_lb_provisioned_missing() {
895            use super::{Condition, is_service_loadbalancer_provisioned};
896
897            assert!(!is_service_loadbalancer_provisioned().matches_object(None))
898        }
899
900        #[test]
901        /// pass when ingress has received a loadbalancer IP
902        fn ingress_provisioned_ok_ip() {
903            use super::{Condition, is_ingress_provisioned};
904
905            let ingress = r#"
906                apiVersion: networking.k8s.io/v1
907                kind: Ingress
908                metadata:
909                  name: test
910                  namespace: default
911                  resourceVersion: "1401"
912                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
913                spec:
914                  ingressClassName: nginx
915                  rules:
916                  - host: httpbin.local
917                    http:
918                      paths:
919                      - path: /
920                        backend:
921                          service:
922                            name: httpbin
923                            port:
924                              number: 80
925                status:
926                  loadBalancer:
927                    ingress:
928                      - ip: 10.89.7.3
929            "#;
930
931            let i = serde_yaml::from_str(ingress).unwrap();
932            assert!(is_ingress_provisioned().matches_object(Some(&i)))
933        }
934
935        #[test]
936        /// pass when ingress has received a loadbalancer hostname
937        fn ingress_provisioned_ok_hostname() {
938            use super::{Condition, is_ingress_provisioned};
939
940            let ingress = r#"
941                apiVersion: networking.k8s.io/v1
942                kind: Ingress
943                metadata:
944                  name: test
945                  namespace: default
946                  resourceVersion: "1401"
947                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
948                spec:
949                  ingressClassName: nginx
950                  rules:
951                  - host: httpbin.local
952                    http:
953                      paths:
954                      - path: /
955                        backend:
956                          service:
957                            name: httpbin
958                            port:
959                              number: 80
960                status:
961                  loadBalancer:
962                    ingress:
963                      - hostname: example.exposed.service
964            "#;
965
966            let i = serde_yaml::from_str(ingress).unwrap();
967            assert!(is_ingress_provisioned().matches_object(Some(&i)))
968        }
969
970        #[test]
971        /// fail if ingress is still waiting for a LB
972        fn ingress_provisioned_pending() {
973            use super::{Condition, is_ingress_provisioned};
974
975            let ingress = r#"
976                apiVersion: networking.k8s.io/v1
977                kind: Ingress
978                metadata:
979                  name: test
980                  namespace: default
981                  resourceVersion: "1401"
982                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
983                spec:
984                  ingressClassName: nginx
985                  rules:
986                  - host: httpbin.local
987                    http:
988                      paths:
989                      - path: /
990                        backend:
991                          service:
992                            name: httpbin
993                            port:
994                              number: 80
995                status:
996                  loadBalancer: {}
997            "#;
998
999            let i = serde_yaml::from_str(ingress).unwrap();
1000            assert!(!is_ingress_provisioned().matches_object(Some(&i)))
1001        }
1002
1003        #[test]
1004        /// fail if ingress does not exist
1005        fn ingress_provisioned_missing() {
1006            use super::{Condition, is_ingress_provisioned};
1007
1008            assert!(!is_ingress_provisioned().matches_object(None))
1009        }
1010    }
1011}
1012
1013/// Utilities for deleting objects
1014pub mod delete {
1015    use super::{await_condition, conditions};
1016    use kube_client::{Api, Resource, api::DeleteParams};
1017    use serde::de::DeserializeOwned;
1018    use std::fmt::Debug;
1019    use thiserror::Error;
1020
1021    /// Errors from `delete_and_finalize`
1022    #[derive(Debug, Error)]
1023    pub enum Error {
1024        /// No uid found on metadata.uid on deleted object
1025        #[error("deleted object has no UID to wait for")]
1026        NoUid,
1027
1028        /// Apiserver returned an error to the delete call
1029        #[error("failed to delete object: {0}")]
1030        Delete(#[source] kube_client::Error),
1031
1032        /// A watcher failed to probe the object for the `is_deleted` condition
1033        #[error("failed to wait for object to be deleted: {0}")]
1034        Await(#[source] super::Error),
1035    }
1036
1037    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
1042    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1043        api: Api<K>,
1044        name: &str,
1045        delete_params: &DeleteParams,
1046    ) -> Result<(), Error> {
1047        let deleted_obj_uid = api
1048            .delete(name, delete_params)
1049            .await
1050            .map_err(Error::Delete)?
1051            .either(
1052                |mut obj| obj.meta_mut().uid.take(),
1053                |status| status.details.map(|details| details.uid),
1054            )
1055            .ok_or(Error::NoUid)?;
1056        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1057            .await
1058            .map_err(Error::Await)?;
1059        Ok(())
1060    }
1061}