kube_client_ext/
ext2.rs

1use std::collections::BTreeMap;
2use std::fmt;
3
4use client::ResourceExt;
5use k8s::OwnerReferenceExt;
6
7use super::*;
8
9/// Async extentions to `kube::Client`
10///
11#[async_trait::async_trait]
12pub trait KubeClientExt2: KubeClientExt {
13    /// Get named configmap from a given (or default) namespace
14    /// Return `None` if not found`
15    ///
16    async fn get_configmap_opt(
17        &self,
18        name: &str,
19        namespace: impl Into<Option<&str>> + Send,
20    ) -> client::Result<Option<corev1::ConfigMap>> {
21        self.configmaps(namespace).get_opt(name).await
22    }
23
24    /// Get named configmap from a given (or default) namespace
25    ///
26    async fn get_configmap(
27        &self,
28        name: &str,
29        namespace: impl Into<Option<&str>> + Send,
30    ) -> client::Result<corev1::ConfigMap> {
31        self.configmaps(namespace).get(name).await
32    }
33
34    /// Get named secret from a given (or default) namespace
35    /// Return `None` if not found`
36    ///
37    async fn get_secret_opt(
38        &self,
39        name: &str,
40        namespace: impl Into<Option<&str>> + Send,
41    ) -> client::Result<Option<corev1::Secret>> {
42        self.secrets(namespace).get_opt(name).await
43    }
44
45    /// Get named secret from a given (or default) namespace
46    ///
47    async fn get_secret(
48        &self,
49        name: &str,
50        namespace: impl Into<Option<&str>> + Send,
51    ) -> client::Result<corev1::Secret> {
52        self.secrets(namespace).get(name).await
53    }
54
55    /// Get named deployment from a given (or default) namespace
56    /// Return `None` if not found
57    ///
58    async fn get_deployment_opt(
59        &self,
60        name: &str,
61        namespace: impl Into<Option<&str>> + Send,
62    ) -> client::Result<Option<appsv1::Deployment>> {
63        self.deployments(namespace).get_opt(name).await
64    }
65
66    /// Get named deployment from a given (or default) namespace
67    ///
68    async fn get_deployment(
69        &self,
70        name: &str,
71        namespace: impl Into<Option<&str>> + Send,
72    ) -> client::Result<appsv1::Deployment> {
73        self.deployments(namespace).get(name).await
74    }
75
76    /// Get named api service
77    /// Return `None` if not found
78    ///
79    async fn get_apiservice_opt(
80        &self,
81        name: &str,
82    ) -> client::Result<Option<apiregistrationv1::APIService>> {
83        self.apiservices().get_opt(name).await
84    }
85
86    /// Get named api service
87    ///
88    async fn get_apiservice(&self, name: &str) -> client::Result<apiregistrationv1::APIService> {
89        self.apiservices().get(name).await
90    }
91
92    /// Get named CRD
93    /// Return `None` if not found
94    ///
95    async fn get_crd_opt(
96        &self,
97        name: &str,
98    ) -> client::Result<Option<apiextensionsv1::CustomResourceDefinition>> {
99        self.crds().get_opt(name).await
100    }
101
102    /// Get named CRD
103    ///
104    async fn get_crd(
105        &self,
106        name: &str,
107    ) -> client::Result<apiextensionsv1::CustomResourceDefinition> {
108        self.crds().get(name).await
109    }
110
111    /// Get owner object from `ownerReference` assuming it is of kind `K`
112    ///
113    async fn get_owner_k<O, K>(&self, o: &O) -> client::Result<Option<K>>
114    where
115        O: client::ResourceExt + Sync,
116        K: Clone
117            + fmt::Debug
118            + k8s::openapi::serde::de::DeserializeOwned
119            + client::Resource<Scope = k8s::openapi::NamespaceResourceScope>,
120        <K as client::Resource>::DynamicType: Default,
121    {
122        let dynamic_default = K::DynamicType::default();
123        let kind = K::kind(&dynamic_default);
124        let namespace = o.namespace();
125        if let Some(name) = o
126            .owner_references()
127            .iter()
128            .find(|owner| owner.kind == kind)
129            .map(|owner| &owner.name)
130        {
131            self.namespaced_k(namespace.as_deref()).get_opt(name).await
132        } else {
133            Ok(None)
134        }
135    }
136
137    /// List all `Pod`s  in a given (or default) namespace
138    ///
139    async fn list_pods(
140        &self,
141        namespace: impl Into<Option<&str>> + Send,
142    ) -> client::Result<Vec<corev1::Pod>> {
143        self.list_k(namespace).await
144    }
145
146    /// List all `Deployment`s in a given (or default) namespace
147    ///
148    async fn list_deployments(
149        &self,
150        namespace: impl Into<Option<&str>> + Send,
151    ) -> client::Result<Vec<appsv1::Deployment>> {
152        self.list_k(namespace).await
153    }
154
155    /// List all `ReplicaSets` in a given (or default) namespace
156    ///
157    async fn list_replicasets(
158        &self,
159        namespace: impl Into<Option<&str>> + Send,
160    ) -> client::Result<Vec<appsv1::ReplicaSet>> {
161        self.list_k(namespace).await
162    }
163
164    /// List all `Job`s in a given (or default) namespace
165    ///
166    async fn list_jobs(
167        &self,
168        namespace: impl Into<Option<&str>> + Send,
169    ) -> client::Result<Vec<batchv1::Job>> {
170        self.list_k(namespace).await
171    }
172
173    /// List all `CronJob`s in a given (or default) namespace
174    ///
175    async fn list_cronjobs(
176        &self,
177        namespace: impl Into<Option<&str>> + Send,
178    ) -> client::Result<Vec<batchv1::CronJob>> {
179        self.list_k(namespace).await
180    }
181
182    /// List namespaced objects of kind `K` in a given (or default) namespace
183    ///
184    async fn list_k<K>(&self, namespace: impl Into<Option<&str>> + Send) -> client::Result<Vec<K>>
185    where
186        K: Clone
187            + fmt::Debug
188            + k8s::openapi::serde::de::DeserializeOwned
189            + client::Resource<Scope = k8s::openapi::NamespaceResourceScope>,
190        <K as client::Resource>::DynamicType: Default,
191    {
192        let lp = self.list_params();
193        self.namespaced_k(namespace)
194            .list(&lp)
195            .await
196            .map(|list| list.items)
197    }
198
199    /// Get all the pods associated with the deployment
200    /// The logic is based on what `kubectl describe` does
201    ///
202    async fn get_pods_by_deployment_name(
203        &self,
204        name: &str,
205        namespace: impl Into<Option<&str>> + Send,
206    ) -> client::Result<Option<Vec<corev1::Pod>>> {
207        // Get the deployment
208        let Some(deployment) = self.get_deployment_opt(name, namespace).await? else {
209            return Ok(None);
210        };
211
212        self.get_pods_by_deployment(&deployment).await
213    }
214
215    /// Get all the pods associated with the `deployment`
216    /// The logic is based on what `kubectl describe` does
217    ///
218    async fn get_pods_by_deployment(
219        &self,
220        deployment: &appsv1::Deployment,
221    ) -> client::Result<Option<Vec<corev1::Pod>>> {
222        let namespace = deployment.namespace();
223        // Get all its replicas
224        let mut replicasets = self
225            .list_replicasets(namespace.as_deref())
226            .await?
227            .into_iter()
228            .filter(|rs| rs.is_controlled_by(deployment))
229            .collect::<Vec<_>>();
230
231        // Find the `NewReplicaSet`
232        replicasets.sort_by_key(|rs| rs.creation_timestamp());
233        let Some(new) = replicasets
234            .iter()
235            .find(|rs| match_template_spec_no_hash(rs, deployment))
236        else {
237            return Ok(None);
238        };
239
240        // Find all the Pods controlled by this ReplicaSet
241        let pods = self
242            .list_pods(namespace.as_deref())
243            .await?
244            .into_iter()
245            .filter(|pod| pod.is_controlled_by(new))
246            .collect();
247
248        Ok(Some(pods))
249    }
250}
251
252impl KubeClientExt2 for client::Client {}
253
254fn match_template_spec_no_hash(rs: &appsv1::ReplicaSet, deployment: &appsv1::Deployment) -> bool {
255    let rs_template = rs_pod_template(rs).map(remove_hash);
256    let deployment_template = deployment_pod_template(deployment).map(remove_hash);
257    rs_template == deployment_template
258}
259
260fn remove_hash(template: &corev1::PodTemplateSpec) -> corev1::PodTemplateSpec {
261    let mut template = template.clone();
262    if let Some(labels) = labels_mut(&mut template) {
263        labels.remove(k8s::label::DEFAULT_DEPLOYMENT_UNIQUE_LABEL_KEY);
264    }
265    template
266}
267
268fn labels_mut(template: &mut corev1::PodTemplateSpec) -> Option<&mut BTreeMap<String, String>> {
269    template.metadata.as_mut()?.labels.as_mut()
270}
271
272fn rs_pod_template(rs: &appsv1::ReplicaSet) -> Option<&corev1::PodTemplateSpec> {
273    rs.spec.as_ref()?.template.as_ref()
274}
275
276fn deployment_pod_template(deployment: &appsv1::Deployment) -> Option<&corev1::PodTemplateSpec> {
277    deployment.spec.as_ref().map(|spec| &spec.template)
278}