1use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
14pub enum Error {
15 #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
17 ProbeFailed(#[source] watcher::Error),
18}
19
20#[allow(clippy::missing_panics_doc)] pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
55where
56 K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
57{
58 let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
60 let matches = cond.matches_object(obj.as_ref());
61 future::ready(Ok(!matches))
62 }));
63
64 let obj = stream
66 .try_next()
67 .await
68 .map_err(Error::ProbeFailed)?
69 .expect("stream must not terminate");
70 Ok(obj)
71}
72
73pub trait Condition<K> {
96 fn matches_object(&self, obj: Option<&K>) -> bool;
98
99 fn not(self) -> conditions::Not<Self>
110 where
111 Self: Sized,
112 {
113 conditions::Not(self)
114 }
115
116 fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
130 where
131 Self: Sized,
132 {
133 conditions::And(self, other)
134 }
135
136 fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
150 where
151 Self: Sized,
152 {
153 conditions::Or(self, other)
154 }
155}
156
157impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
158 fn matches_object(&self, obj: Option<&K>) -> bool {
159 (self)(obj)
160 }
161}
162
163pub mod conditions {
165 pub use super::Condition;
166 use k8s_openapi::{
167 api::{
168 apps::v1::Deployment,
169 batch::v1::Job,
170 core::v1::{Pod, Service},
171 networking::v1::Ingress,
172 },
173 apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
174 };
175 use kube_client::Resource;
176
177 #[must_use]
183 pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
184 move |obj: Option<&K>| {
185 obj.is_none_or(
187 |obj| obj.meta().uid.as_deref() != Some(uid),
189 )
190 }
191 }
192
193 #[must_use]
197 pub fn is_created<K: Resource>() -> impl Condition<K> {
198 |obj: Option<&K>| obj.is_some()
199 }
200
201 #[must_use]
206 pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
207 |obj: Option<&CustomResourceDefinition>| {
208 if let Some(o) = obj
209 && let Some(s) = &o.status
210 && let Some(conds) = &s.conditions
211 && let Some(pcond) = conds.iter().find(|c| c.type_ == "Established")
212 {
213 return pcond.status == "True";
214 }
215 false
216 }
217 }
218
219 #[must_use]
221 pub fn is_pod_running() -> impl Condition<Pod> {
222 |obj: Option<&Pod>| {
223 if let Some(pod) = &obj
224 && let Some(status) = &pod.status
225 && let Some(phase) = &status.phase
226 {
227 return phase == "Running";
228 }
229 false
230 }
231 }
232
233 #[must_use]
235 pub fn is_job_completed() -> impl Condition<Job> {
236 |obj: Option<&Job>| {
237 if let Some(job) = &obj
238 && let Some(s) = &job.status
239 && let Some(conds) = &s.conditions
240 && let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete")
241 {
242 return pcond.status == "True";
243 }
244 false
245 }
246 }
247
248 #[must_use]
253 pub fn is_deployment_completed() -> impl Condition<Deployment> {
254 |obj: Option<&Deployment>| {
255 if let Some(depl) = &obj
256 && let Some(s) = &depl.status
257 && let Some(conds) = &s.conditions
258 && let Some(dcond) = conds.iter().find(|c| {
259 c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
260 })
261 {
262 return dcond.status == "True";
263 }
264 false
265 }
266 }
267
268 #[must_use]
270 pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
271 |obj: Option<&Service>| {
272 if let Some(svc) = &obj
273 && let Some(spec) = &svc.spec
274 {
275 if spec.type_ != Some("LoadBalancer".to_string()) {
277 return true;
278 }
279 if let Some(s) = &svc.status
281 && let Some(lbs) = &s.load_balancer
282 && let Some(ings) = &lbs.ingress
283 {
284 return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
285 }
286 }
287 false
288 }
289 }
290
291 #[must_use]
293 pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
294 |obj: Option<&Ingress>| {
295 if let Some(ing) = &obj
296 && let Some(s) = &ing.status
297 && let Some(lbs) = &s.load_balancer
298 && let Some(ings) = &lbs.ingress
299 {
300 return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
301 }
302 false
303 }
304 }
305
306 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
308 pub struct Not<A>(pub(super) A);
309 impl<A: Condition<K>, K> Condition<K> for Not<A> {
310 fn matches_object(&self, obj: Option<&K>) -> bool {
311 !self.0.matches_object(obj)
312 }
313 }
314
315 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
317 pub struct And<A, B>(pub(super) A, pub(super) B);
318 impl<A, B, K> Condition<K> for And<A, B>
319 where
320 A: Condition<K>,
321 B: Condition<K>,
322 {
323 fn matches_object(&self, obj: Option<&K>) -> bool {
324 self.0.matches_object(obj) && self.1.matches_object(obj)
325 }
326 }
327
328 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
330 pub struct Or<A, B>(pub(super) A, pub(super) B);
331 impl<A, B, K> Condition<K> for Or<A, B>
332 where
333 A: Condition<K>,
334 B: Condition<K>,
335 {
336 fn matches_object(&self, obj: Option<&K>) -> bool {
337 self.0.matches_object(obj) || self.1.matches_object(obj)
338 }
339 }
340
341 mod tests {
342 #[test]
343 fn crd_established_ok() {
345 use super::{Condition, is_crd_established};
346
347 let crd = r#"
348 apiVersion: apiextensions.k8s.io/v1
349 kind: CustomResourceDefinition
350 metadata:
351 name: testthings.kube.rs
352 spec:
353 group: kube.rs
354 names:
355 categories: []
356 kind: TestThing
357 plural: testthings
358 shortNames: []
359 singular: testthing
360 scope: Namespaced
361 versions:
362 - additionalPrinterColumns: []
363 name: v1
364 schema:
365 openAPIV3Schema:
366 type: object
367 x-kubernetes-preserve-unknown-fields: true
368 served: true
369 storage: true
370 status:
371 acceptedNames:
372 kind: TestThing
373 listKind: TestThingList
374 plural: testthings
375 singular: testthing
376 conditions:
377 - lastTransitionTime: "2025-03-06T03:10:03Z"
378 message: no conflicts found
379 reason: NoConflicts
380 status: "True"
381 type: NamesAccepted
382 - lastTransitionTime: "2025-03-06T03:10:03Z"
383 message: the initial names have been accepted
384 reason: InitialNamesAccepted
385 status: "True"
386 type: Established
387 storedVersions:
388 - v1
389 "#;
390
391 let c = serde_saphyr::from_str(crd).unwrap();
392 assert!(is_crd_established().matches_object(Some(&c)))
393 }
394
395 #[test]
396 fn crd_established_fail() {
398 use super::{Condition, is_crd_established};
399
400 let crd = r#"
401 apiVersion: apiextensions.k8s.io/v1
402 kind: CustomResourceDefinition
403 metadata:
404 name: testthings.kube.rs
405 spec:
406 group: kube.rs
407 names:
408 categories: []
409 kind: TestThing
410 plural: testthings
411 shortNames: []
412 singular: testthing
413 scope: Namespaced
414 versions:
415 - additionalPrinterColumns: []
416 name: v1
417 schema:
418 openAPIV3Schema:
419 type: object
420 x-kubernetes-preserve-unknown-fields: true
421 served: true
422 storage: true
423 status:
424 acceptedNames:
425 kind: TestThing
426 listKind: TestThingList
427 plural: testthings
428 singular: testthing
429 conditions:
430 - lastTransitionTime: "2025-03-06T03:10:03Z"
431 message: no conflicts found
432 reason: NoConflicts
433 status: "True"
434 type: NamesAccepted
435 - lastTransitionTime: "2025-03-06T03:10:03Z"
436 message: the initial names have been accepted
437 reason: InitialNamesAccepted
438 status: "False"
439 type: Established
440 storedVersions:
441 - v1
442 "#;
443
444 let c = serde_saphyr::from_str(crd).unwrap();
445 assert!(!is_crd_established().matches_object(Some(&c)))
446 }
447
448 #[test]
449 fn crd_established_missing() {
451 use super::{Condition, is_crd_established};
452
453 assert!(!is_crd_established().matches_object(None))
454 }
455
456 #[test]
457 fn pod_running_ok() {
459 use super::{Condition, is_pod_running};
460
461 let pod = r#"
462 apiVersion: v1
463 kind: Pod
464 metadata:
465 namespace: default
466 name: testpod
467 spec:
468 containers:
469 - name: testcontainer
470 image: alpine
471 command: [ sleep ]
472 args: [ "100000" ]
473 status:
474 conditions:
475 - lastProbeTime: null
476 lastTransitionTime: "2025-03-06T03:53:07Z"
477 status: "True"
478 type: PodReadyToStartContainers
479 - lastProbeTime: null
480 lastTransitionTime: "2025-03-06T03:52:58Z"
481 status: "True"
482 type: Initialized
483 - lastProbeTime: null
484 lastTransitionTime: "2025-03-06T03:53:24Z"
485 status: "True"
486 type: Ready
487 - lastProbeTime: null
488 lastTransitionTime: "2025-03-06T03:53:24Z"
489 status: "True"
490 type: ContainersReady
491 - lastProbeTime: null
492 lastTransitionTime: "2025-03-06T03:52:58Z"
493 status: "True"
494 type: PodScheduled
495 containerStatuses:
496 - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
497 image: docker.io/library/alpine:latest
498 imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
499 lastState: {}
500 name: testcontainer
501 ready: true
502 restartCount: 0
503 started: true
504 state:
505 running:
506 startedAt: "2025-03-06T03:59:20Z"
507 phase: Running
508 qosClass: Burstable
509 "#;
510
511 let p = serde_saphyr::from_str(pod).unwrap();
512 assert!(is_pod_running().matches_object(Some(&p)))
513 }
514
515 #[test]
516 fn pod_running_unschedulable() {
518 use super::{Condition, is_pod_running};
519
520 let pod = r#"
521 apiVersion: v1
522 kind: Pod
523 metadata:
524 namespace: default
525 name: testpod
526 spec:
527 containers:
528 - name: testcontainer
529 image: alpine
530 command: [ sleep ]
531 args: [ "100000" ]
532 status:
533 conditions:
534 - lastProbeTime: null
535 lastTransitionTime: "2025-03-06T03:52:25Z"
536 message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling.'
537 reason: Unschedulable
538 status: "False"
539 type: PodScheduled
540 phase: Pending
541 qosClass: Burstable
542 "#;
543
544 let p = serde_saphyr::from_str(pod).unwrap();
545 assert!(!is_pod_running().matches_object(Some(&p)))
546 }
547
548 #[test]
549 fn pod_running_missing() {
551 use super::{Condition, is_pod_running};
552
553 assert!(!is_pod_running().matches_object(None))
554 }
555
556 #[test]
557 fn job_completed_ok() {
559 use super::{Condition, is_job_completed};
560
561 let job = r#"
562 apiVersion: batch/v1
563 kind: Job
564 metadata:
565 name: pi
566 namespace: default
567 spec:
568 template:
569 spec:
570 containers:
571 - name: pi
572 command:
573 - perl
574 - -Mbignum=bpi
575 - -wle
576 - print bpi(2000)
577 image: perl:5.34.0
578 imagePullPolicy: IfNotPresent
579 status:
580 completionTime: "2025-03-06T05:27:56Z"
581 conditions:
582 - lastProbeTime: "2025-03-06T05:27:56Z"
583 lastTransitionTime: "2025-03-06T05:27:56Z"
584 message: Reached expected number of succeeded pods
585 reason: CompletionsReached
586 status: "True"
587 type: SuccessCriteriaMet
588 - lastProbeTime: "2025-03-06T05:27:56Z"
589 lastTransitionTime: "2025-03-06T05:27:56Z"
590 message: Reached expected number of succeeded pods
591 reason: CompletionsReached
592 status: "True"
593 type: Complete
594 ready: 0
595 startTime: "2025-03-06T05:27:27Z"
596 succeeded: 1
597 terminating: 0
598 uncountedTerminatedPods: {}
599 "#;
600
601 let j = serde_saphyr::from_str(job).unwrap();
602 assert!(is_job_completed().matches_object(Some(&j)))
603 }
604
605 #[test]
606 fn job_completed_running() {
608 use super::{Condition, is_job_completed};
609
610 let job = r#"
611 apiVersion: batch/v1
612 kind: Job
613 metadata:
614 name: pi
615 namespace: default
616 spec:
617 backoffLimit: 4
618 completionMode: NonIndexed
619 completions: 1
620 manualSelector: false
621 parallelism: 1
622 template:
623 spec:
624 containers:
625 - name: pi
626 command:
627 - perl
628 - -Mbignum=bpi
629 - -wle
630 - print bpi(2000)
631 image: perl:5.34.0
632 imagePullPolicy: IfNotPresent
633 status:
634 active: 1
635 ready: 0
636 startTime: "2025-03-06T05:27:27Z"
637 terminating: 0
638 uncountedTerminatedPods: {}
639 "#;
640
641 let j = serde_saphyr::from_str(job).unwrap();
642 assert!(!is_job_completed().matches_object(Some(&j)))
643 }
644
645 #[test]
646 fn job_completed_missing() {
648 use super::{Condition, is_job_completed};
649
650 assert!(!is_job_completed().matches_object(None))
651 }
652
653 #[test]
654 fn deployment_completed_ok() {
656 use super::{Condition, is_deployment_completed};
657
658 let depl = r#"
659 apiVersion: apps/v1
660 kind: Deployment
661 metadata:
662 name: testapp
663 namespace: default
664 spec:
665 progressDeadlineSeconds: 600
666 replicas: 3
667 revisionHistoryLimit: 10
668 selector:
669 matchLabels:
670 app: test
671 strategy:
672 rollingUpdate:
673 maxSurge: 25%
674 maxUnavailable: 25%
675 type: RollingUpdate
676 template:
677 metadata:
678 creationTimestamp: null
679 labels:
680 app: test
681 spec:
682 containers:
683 - image: postgres
684 imagePullPolicy: Always
685 name: postgres
686 ports:
687 - containerPort: 5432
688 protocol: TCP
689 env:
690 - name: POSTGRES_PASSWORD
691 value: foobar
692 status:
693 availableReplicas: 3
694 conditions:
695 - lastTransitionTime: "2025-03-06T06:06:57Z"
696 lastUpdateTime: "2025-03-06T06:06:57Z"
697 message: Deployment has minimum availability.
698 reason: MinimumReplicasAvailable
699 status: "True"
700 type: Available
701 - lastTransitionTime: "2025-03-06T06:03:20Z"
702 lastUpdateTime: "2025-03-06T06:06:57Z"
703 message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
704 reason: NewReplicaSetAvailable
705 status: "True"
706 type: Progressing
707 observedGeneration: 2
708 readyReplicas: 3
709 replicas: 3
710 updatedReplicas: 3
711 "#;
712
713 let d = serde_saphyr::from_str(depl).unwrap();
714 assert!(is_deployment_completed().matches_object(Some(&d)))
715 }
716
717 #[test]
718 fn deployment_completed_pending() {
720 use super::{Condition, is_deployment_completed};
721
722 let depl = r#"
723 apiVersion: apps/v1
724 kind: Deployment
725 metadata:
726 name: testapp
727 namespace: default
728 spec:
729 progressDeadlineSeconds: 600
730 replicas: 3
731 revisionHistoryLimit: 10
732 selector:
733 matchLabels:
734 app: test
735 strategy:
736 rollingUpdate:
737 maxSurge: 25%
738 maxUnavailable: 25%
739 type: RollingUpdate
740 template:
741 metadata:
742 creationTimestamp: null
743 labels:
744 app: test
745 spec:
746 containers:
747 - image: postgres
748 imagePullPolicy: Always
749 name: postgres
750 ports:
751 - containerPort: 5432
752 protocol: TCP
753 env:
754 - name: POSTGRES_PASSWORD
755 value: foobar
756 status:
757 conditions:
758 - lastTransitionTime: "2025-03-06T06:03:20Z"
759 lastUpdateTime: "2025-03-06T06:03:20Z"
760 message: Deployment does not have minimum availability.
761 reason: MinimumReplicasUnavailable
762 status: "False"
763 type: Available
764 - lastTransitionTime: "2025-03-06T06:03:20Z"
765 lastUpdateTime: "2025-03-06T06:03:20Z"
766 message: ReplicaSet "testapp-77789cd7d4" is progressing.
767 reason: ReplicaSetUpdated
768 status: "True"
769 type: Progressing
770 observedGeneration: 1
771 replicas: 3
772 unavailableReplicas: 3
773 updatedReplicas: 3
774 "#;
775
776 let d = serde_saphyr::from_str(depl).unwrap();
777 assert!(!is_deployment_completed().matches_object(Some(&d)))
778 }
779
780 #[test]
781 fn deployment_completed_missing() {
783 use super::{Condition, is_deployment_completed};
784
785 assert!(!is_deployment_completed().matches_object(None))
786 }
787
788 #[test]
789 fn service_lb_provisioned_ok_ip() {
791 use super::{Condition, is_service_loadbalancer_provisioned};
792
793 let service = r"
794 apiVersion: v1
795 kind: Service
796 metadata:
797 name: test
798 spec:
799 selector:
800 app.kubernetes.io/name: test
801 type: LoadBalancer
802 ports:
803 - protocol: TCP
804 port: 80
805 targetPort: 9376
806 clusterIP: 10.0.171.239
807 status:
808 loadBalancer:
809 ingress:
810 - ip: 192.0.2.127
811 ";
812
813 let s = serde_saphyr::from_str(service).unwrap();
814 assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
815 }
816
817 #[test]
818 fn service_lb_provisioned_ok_hostname() {
820 use super::{Condition, is_service_loadbalancer_provisioned};
821
822 let service = r"
823 apiVersion: v1
824 kind: Service
825 metadata:
826 name: test
827 spec:
828 selector:
829 app.kubernetes.io/name: test
830 type: LoadBalancer
831 ports:
832 - protocol: TCP
833 port: 80
834 targetPort: 9376
835 clusterIP: 10.0.171.239
836 status:
837 loadBalancer:
838 ingress:
839 - hostname: example.exposed.service
840 ";
841
842 let s = serde_saphyr::from_str(service).unwrap();
843 assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
844 }
845
846 #[test]
847 fn service_lb_provisioned_pending() {
849 use super::{Condition, is_service_loadbalancer_provisioned};
850
851 let service = r"
852 apiVersion: v1
853 kind: Service
854 metadata:
855 name: test
856 spec:
857 selector:
858 app.kubernetes.io/name: test
859 type: LoadBalancer
860 ports:
861 - protocol: TCP
862 port: 80
863 targetPort: 9376
864 clusterIP: 10.0.171.239
865 status:
866 loadBalancer: {}
867 ";
868
869 let s = serde_saphyr::from_str(service).unwrap();
870 assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
871 }
872
873 #[test]
874 fn service_lb_provisioned_not_loadbalancer() {
876 use super::{Condition, is_service_loadbalancer_provisioned};
877
878 let service = r"
879 apiVersion: v1
880 kind: Service
881 metadata:
882 name: test
883 spec:
884 selector:
885 app.kubernetes.io/name: test
886 type: ClusterIP
887 ports:
888 - protocol: TCP
889 port: 80
890 targetPort: 9376
891 status:
892 loadBalancer: {}
893 ";
894
895 let s = serde_saphyr::from_str(service).unwrap();
896 assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
897 }
898
899 #[test]
900 fn service_lb_provisioned_missing() {
902 use super::{Condition, is_service_loadbalancer_provisioned};
903
904 assert!(!is_service_loadbalancer_provisioned().matches_object(None))
905 }
906
907 #[test]
908 fn ingress_provisioned_ok_ip() {
910 use super::{Condition, is_ingress_provisioned};
911
912 let ingress = r#"
913 apiVersion: networking.k8s.io/v1
914 kind: Ingress
915 metadata:
916 name: test
917 namespace: default
918 resourceVersion: "1401"
919 uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
920 spec:
921 ingressClassName: nginx
922 rules:
923 - host: httpbin.local
924 http:
925 paths:
926 - path: /
927 backend:
928 service:
929 name: httpbin
930 port:
931 number: 80
932 status:
933 loadBalancer:
934 ingress:
935 - ip: 10.89.7.3
936 "#;
937
938 let i = serde_saphyr::from_str(ingress).unwrap();
939 assert!(is_ingress_provisioned().matches_object(Some(&i)))
940 }
941
942 #[test]
943 fn ingress_provisioned_ok_hostname() {
945 use super::{Condition, is_ingress_provisioned};
946
947 let ingress = r#"
948 apiVersion: networking.k8s.io/v1
949 kind: Ingress
950 metadata:
951 name: test
952 namespace: default
953 resourceVersion: "1401"
954 uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
955 spec:
956 ingressClassName: nginx
957 rules:
958 - host: httpbin.local
959 http:
960 paths:
961 - path: /
962 backend:
963 service:
964 name: httpbin
965 port:
966 number: 80
967 status:
968 loadBalancer:
969 ingress:
970 - hostname: example.exposed.service
971 "#;
972
973 let i = serde_saphyr::from_str(ingress).unwrap();
974 assert!(is_ingress_provisioned().matches_object(Some(&i)))
975 }
976
977 #[test]
978 fn ingress_provisioned_pending() {
980 use super::{Condition, is_ingress_provisioned};
981
982 let ingress = r#"
983 apiVersion: networking.k8s.io/v1
984 kind: Ingress
985 metadata:
986 name: test
987 namespace: default
988 resourceVersion: "1401"
989 uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
990 spec:
991 ingressClassName: nginx
992 rules:
993 - host: httpbin.local
994 http:
995 paths:
996 - path: /
997 backend:
998 service:
999 name: httpbin
1000 port:
1001 number: 80
1002 status:
1003 loadBalancer: {}
1004 "#;
1005
1006 let i = serde_saphyr::from_str(ingress).unwrap();
1007 assert!(!is_ingress_provisioned().matches_object(Some(&i)))
1008 }
1009
1010 #[test]
1011 fn ingress_provisioned_missing() {
1013 use super::{Condition, is_ingress_provisioned};
1014
1015 assert!(!is_ingress_provisioned().matches_object(None))
1016 }
1017 }
1018}
1019
1020pub mod delete {
1022 use super::{await_condition, conditions};
1023 use kube_client::{Api, Resource, api::DeleteParams};
1024 use serde::de::DeserializeOwned;
1025 use std::fmt::Debug;
1026 use thiserror::Error;
1027
1028 #[derive(Debug, Error)]
1030 pub enum Error {
1031 #[error("deleted object has no UID to wait for")]
1033 NoUid,
1034
1035 #[error("failed to delete object: {0}")]
1037 Delete(#[source] kube_client::Error),
1038
1039 #[error("failed to wait for object to be deleted: {0}")]
1041 Await(#[source] super::Error),
1042 }
1043
1044 pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1050 api: Api<K>,
1051 name: &str,
1052 delete_params: &DeleteParams,
1053 ) -> Result<(), Error> {
1054 let deleted_obj_uid = api
1055 .delete(name, delete_params)
1056 .await
1057 .map_err(Error::Delete)?
1058 .either(
1059 |mut obj| obj.meta_mut().uid.take(),
1060 |status| status.details.map(|details| details.uid),
1061 )
1062 .ok_or(Error::NoUid)?;
1063 await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1064 .await
1065 .map_err(Error::Await)?;
1066 Ok(())
1067 }
1068}