kube_client/api/
core_methods.rs

1use either::Either;
2use futures::Stream;
3use serde::{Serialize, de::DeserializeOwned};
4use std::fmt::Debug;
5
6use crate::{Error, Result, api::Api};
7use kube_core::{WatchEvent, metadata::PartialObjectMeta, object::ObjectList, params::*, response::Status};
8
9/// PUSH/PUT/POST/GET abstractions
10impl<K> Api<K>
11where
12    K: Clone + DeserializeOwned + Debug,
13{
14    /// Get a named resource
15    ///
16    /// ```no_run
17    /// # use kube::Api;
18    /// use k8s_openapi::api::core::v1::Pod;
19    ///
20    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
21    /// # let client: kube::Client = todo!();
22    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
23    /// let p: Pod = pods.get("blog").await?;
24    /// # Ok(())
25    /// # }
26    /// ```
27    ///
28    /// # Errors
29    ///
30    /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
31    /// Consider using [`Api::get_opt`] if you need to handle missing objects.
32    pub async fn get(&self, name: &str) -> Result<K> {
33        self.get_with(name, &GetParams::default()).await
34    }
35
36    ///  Get only the metadata for a named resource as [`PartialObjectMeta`]
37    ///
38    /// ```no_run
39    /// use kube::{Api, core::PartialObjectMeta};
40    /// use k8s_openapi::api::core::v1::Pod;
41    ///
42    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
43    /// # let client: kube::Client = todo!();
44    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
45    /// let p: PartialObjectMeta<Pod> = pods.get_metadata("blog").await?;
46    /// # Ok(())
47    /// # }
48    /// ```
49    /// Note that the type may be converted to `ObjectMeta` through the usual
50    /// conversion traits.
51    ///
52    /// # Errors
53    ///
54    /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
55    /// Consider using [`Api::get_metadata_opt`] if you need to handle missing objects.
56    pub async fn get_metadata(&self, name: &str) -> Result<PartialObjectMeta<K>> {
57        self.get_metadata_with(name, &GetParams::default()).await
58    }
59
60    /// [Get](`Api::get`) a named resource with an explicit resourceVersion
61    ///
62    /// This function allows the caller to pass in a [`GetParams`](`super::GetParams`) type containing
63    /// a `resourceVersion` to a [Get](`Api::get`) call.
64    /// For example
65    ///
66    /// ```no_run
67    /// # use kube::{Api, api::GetParams};
68    /// use k8s_openapi::api::core::v1::Pod;
69    ///
70    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
71    /// # let client: kube::Client = todo!();
72    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
73    /// let p: Pod = pods.get_with("blog", &GetParams::any()).await?;
74    /// # Ok(())
75    /// # }
76    /// ```
77    ///
78    /// # Errors
79    ///
80    /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
81    /// Consider using [`Api::get_opt`] if you need to handle missing objects.
82    pub async fn get_with(&self, name: &str, gp: &GetParams) -> Result<K> {
83        let mut req = self.request.get(name, gp).map_err(Error::BuildRequest)?;
84        req.extensions_mut().insert("get");
85        self.client.request::<K>(req).await
86    }
87
88    ///  [Get](`Api::get_metadata`) the metadata of an object using an explicit `resourceVersion`
89    ///
90    /// This function allows the caller to pass in a [`GetParams`](`super::GetParams`) type containing
91    /// a `resourceVersion` to a [Get](`Api::get_metadata`) call.
92    /// For example
93    ///
94    ///
95    /// ```no_run
96    /// use kube::{Api, api::GetParams, core::PartialObjectMeta};
97    /// use k8s_openapi::api::core::v1::Pod;
98    ///
99    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
100    /// # let client: kube::Client = todo!();
101    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
102    /// let p: PartialObjectMeta<Pod> = pods.get_metadata_with("blog", &GetParams::any()).await?;
103    /// # Ok(())
104    /// # }
105    /// ```
106    /// Note that the type may be converted to `ObjectMeta` through the usual
107    /// conversion traits.
108    ///
109    /// # Errors
110    ///
111    /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
112    /// Consider using [`Api::get_metadata_opt`] if you need to handle missing objects.
113    pub async fn get_metadata_with(&self, name: &str, gp: &GetParams) -> Result<PartialObjectMeta<K>> {
114        let mut req = self.request.get_metadata(name, gp).map_err(Error::BuildRequest)?;
115        req.extensions_mut().insert("get_metadata");
116        self.client.request::<PartialObjectMeta<K>>(req).await
117    }
118
119    /// [Get](`Api::get`) a named resource if it exists, returns [`None`] if it doesn't exist
120    ///
121    /// ```no_run
122    /// # use kube::Api;
123    /// use k8s_openapi::api::core::v1::Pod;
124    ///
125    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
126    /// # let client: kube::Client = todo!();
127    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
128    /// if let Some(pod) = pods.get_opt("blog").await? {
129    ///     // Pod was found
130    /// } else {
131    ///     // Pod was not found
132    /// }
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub async fn get_opt(&self, name: &str) -> Result<Option<K>> {
137        match self.get(name).await {
138            Ok(obj) => Ok(Some(obj)),
139            Err(Error::Api(status)) if status.is_not_found() => Ok(None),
140            Err(err) => Err(err),
141        }
142    }
143
144    /// [Get Metadata](`Api::get_metadata`) for a named resource if it exists, returns [`None`] if it doesn't exist
145    ///
146    /// ```no_run
147    /// # use kube::Api;
148    /// use k8s_openapi::api::core::v1::Pod;
149    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
150    /// # let client: kube::Client = todo!();
151    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
152    /// if let Some(pod) = pods.get_metadata_opt("blog").await? {
153    ///     // Pod was found
154    /// } else {
155    ///     // Pod was not found
156    /// }
157    /// # Ok(())
158    /// # }
159    /// ```
160    ///
161    /// Note that [`PartialObjectMeta`] embeds the raw `ObjectMeta`.
162    pub async fn get_metadata_opt(&self, name: &str) -> Result<Option<PartialObjectMeta<K>>> {
163        self.get_metadata_opt_with(name, &GetParams::default()).await
164    }
165
166    /// [Get Metadata](`Api::get_metadata`) of an object if it exists, using an explicit `resourceVersion`.
167    /// Returns [`None`] if it doesn't exist.
168    ///
169    /// ```no_run
170    /// # use kube::Api;
171    /// use k8s_openapi::api::core::v1::Pod;
172    /// use kube_core::params::GetParams;
173    ///
174    /// async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
175    /// # let client: kube::Client = todo!();
176    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
177    /// if let Some(pod) = pods.get_metadata_opt_with("blog", &GetParams::any()).await? {
178    ///     // Pod was found
179    /// } else {
180    ///     // Pod was not found
181    /// }
182    /// # Ok(())
183    /// # }
184    /// ```
185    ///
186    /// Note that [`PartialObjectMeta`] embeds the raw `ObjectMeta`.
187    pub async fn get_metadata_opt_with(
188        &self,
189        name: &str,
190        gp: &GetParams,
191    ) -> Result<Option<PartialObjectMeta<K>>> {
192        match self.get_metadata_with(name, gp).await {
193            Ok(meta) => Ok(Some(meta)),
194            Err(Error::Api(status)) if status.is_not_found() => Ok(None),
195            Err(err) => Err(err),
196        }
197    }
198
199    /// Get a list of resources
200    ///
201    /// You use this to get everything, or a subset matching fields/labels, say:
202    ///
203    /// ```no_run
204    /// use kube::api::{Api, ListParams, ResourceExt};
205    /// use k8s_openapi::api::core::v1::Pod;
206    ///
207    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
208    /// # let client: kube::Client = todo!();
209    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
210    /// let lp = ListParams::default().labels("app=blog"); // for this app only
211    /// for p in pods.list(&lp).await? {
212    ///     println!("Found Pod: {}", p.name_any());
213    /// }
214    /// # Ok(())
215    /// # }
216    /// ```
217    pub async fn list(&self, lp: &ListParams) -> Result<ObjectList<K>> {
218        let mut req = self.request.list(lp).map_err(Error::BuildRequest)?;
219        req.extensions_mut().insert("list");
220        self.client.request::<ObjectList<K>>(req).await
221    }
222
223    /// Get a list of resources that contains only their metadata as
224    ///
225    /// Similar to [list](`Api::list`), you use this to get everything, or a
226    /// subset matching fields/labels. For example
227    ///
228    /// ```no_run
229    /// use kube::api::{Api, ListParams, ResourceExt};
230    /// use kube::core::{ObjectMeta, ObjectList, PartialObjectMeta};
231    /// use k8s_openapi::api::core::v1::Pod;
232    ///
233    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
234    /// # let client: kube::Client = todo!();
235    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
236    /// let lp = ListParams::default().labels("app=blog"); // for this app only
237    /// let list: ObjectList<PartialObjectMeta<Pod>> = pods.list_metadata(&lp).await?;
238    /// for p in list {
239    ///     println!("Found Pod: {}", p.name_any());
240    /// }
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn list_metadata(&self, lp: &ListParams) -> Result<ObjectList<PartialObjectMeta<K>>> {
245        let mut req = self.request.list_metadata(lp).map_err(Error::BuildRequest)?;
246        req.extensions_mut().insert("list_metadata");
247        self.client.request::<ObjectList<PartialObjectMeta<K>>>(req).await
248    }
249
250    /// Create a resource
251    ///
252    /// This function requires a type that Serializes to `K`, which can be:
253    /// 1. Raw string YAML
254    /// - easy to port from existing files
255    ///     - error prone (run-time errors on typos due to failed serialize attempts)
256    ///     - very error prone (can write invalid YAML)
257    /// 2. An instance of the struct itself
258    ///     - easy to instantiate for CRDs (you define the struct)
259    ///     - dense to instantiate for [`k8s_openapi`] types (due to many optionals)
260    ///     - compile-time safety
261    ///     - but still possible to write invalid native types (validation at apiserver)
262    /// 3. [`serde_json::json!`] macro instantiated [`serde_json::Value`]
263    ///     - Tradeoff between the two
264    ///     - Easy partially filling of native [`k8s_openapi`] types (most fields optional)
265    ///     - Partial safety against runtime errors (at least you must write valid JSON)
266    ///
267    /// Note that this method cannot write to the status object (when it exists) of a resource.
268    /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
269    pub async fn create(&self, pp: &PostParams, data: &K) -> Result<K>
270    where
271        K: Serialize,
272    {
273        let bytes = serde_json::to_vec(&data).map_err(Error::SerdeError)?;
274        let mut req = self.request.create(pp, bytes).map_err(Error::BuildRequest)?;
275        req.extensions_mut().insert("create");
276        self.client.request::<K>(req).await
277    }
278
279    /// Delete a named resource
280    ///
281    /// When you get a `K` via `Left`, your delete has started.
282    /// When you get a `Status` via `Right`, this should be a a 2XX style
283    /// confirmation that the object being gone.
284    ///
285    /// 4XX and 5XX status types are returned as an [`Err(kube_client::Error::Api)`](crate::Error::Api).
286    ///
287    /// ```no_run
288    /// use kube::api::{Api, DeleteParams};
289    /// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1 as apiexts;
290    /// use apiexts::CustomResourceDefinition;
291    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
292    /// # let client: kube::Client = todo!();
293    /// let crds: Api<CustomResourceDefinition> = Api::all(client);
294    /// crds.delete("foos.clux.dev", &DeleteParams::default()).await?
295    ///     .map_left(|o| println!("Deleting CRD: {:?}", o.status))
296    ///     .map_right(|s| println!("Deleted CRD: {:?}", s));
297    /// # Ok(())
298    /// # }
299    /// ```
300    pub async fn delete(&self, name: &str, dp: &DeleteParams) -> Result<Either<K, Status>> {
301        let mut req = self.request.delete(name, dp).map_err(Error::BuildRequest)?;
302        req.extensions_mut().insert("delete");
303        self.client.request_status::<K>(req).await
304    }
305
306    /// Delete a collection of resources
307    ///
308    /// When you get an `ObjectList<K>` via `Left`, your delete has started.
309    /// When you get a `Status` via `Right`, this should be a a 2XX style
310    /// confirmation that the object being gone.
311    ///
312    /// 4XX and 5XX status types are returned as an [`Err(kube_client::Error::Api)`](crate::Error::Api).
313    ///
314    /// ```no_run
315    /// use kube::api::{Api, DeleteParams, ListParams, ResourceExt};
316    /// use k8s_openapi::api::core::v1::Pod;
317    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
318    /// # let client: kube::Client = todo!();
319    ///
320    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
321    /// match pods.delete_collection(&DeleteParams::default(), &ListParams::default()).await? {
322    ///     either::Left(list) => {
323    ///         let names: Vec<_> = list.iter().map(ResourceExt::name_any).collect();
324    ///         println!("Deleting collection of pods: {:?}", names);
325    ///     },
326    ///     either::Right(status) => {
327    ///         println!("Deleted collection of pods: status={:?}", status);
328    ///     }
329    /// }
330    /// # Ok(())
331    /// # }
332    /// ```
333    pub async fn delete_collection(
334        &self,
335        dp: &DeleteParams,
336        lp: &ListParams,
337    ) -> Result<Either<ObjectList<K>, Status>> {
338        let mut req = self
339            .request
340            .delete_collection(dp, lp)
341            .map_err(Error::BuildRequest)?;
342        req.extensions_mut().insert("delete_collection");
343        self.client.request_status::<ObjectList<K>>(req).await
344    }
345
346    /// Patch a subset of a resource's properties
347    ///
348    /// Takes a [`Patch`] along with [`PatchParams`] for the call.
349    ///
350    /// ```no_run
351    /// use kube::api::{Api, PatchParams, Patch, Resource};
352    /// use k8s_openapi::api::core::v1::Pod;
353    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
354    /// # let client: kube::Client = todo!();
355    ///
356    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
357    /// let patch = serde_json::json!({
358    ///     "apiVersion": "v1",
359    ///     "kind": "Pod",
360    ///     "metadata": {
361    ///         "name": "blog"
362    ///     },
363    ///     "spec": {
364    ///         "activeDeadlineSeconds": 5
365    ///     }
366    /// });
367    /// let params = PatchParams::apply("myapp");
368    /// let patch = Patch::Apply(&patch);
369    /// let o_patched = pods.patch("blog", &params, &patch).await?;
370    /// # Ok(())
371    /// # }
372    /// ```
373    /// [`Patch`]: super::Patch
374    /// [`PatchParams`]: super::PatchParams
375    ///
376    /// Note that this method cannot write to the status object (when it exists) of a resource.
377    /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
378    pub async fn patch<P: Serialize + Debug>(
379        &self,
380        name: &str,
381        pp: &PatchParams,
382        patch: &Patch<P>,
383    ) -> Result<K> {
384        let mut req = self.request.patch(name, pp, patch).map_err(Error::BuildRequest)?;
385        req.extensions_mut().insert("patch");
386        self.client.request::<K>(req).await
387    }
388
389    /// Patch a metadata subset of a resource's properties from [`PartialObjectMeta`]
390    ///
391    /// Takes a [`Patch`] along with [`PatchParams`] for the call.
392    /// Patches can be constructed raw using `serde_json::json!` or from `ObjectMeta` via [`PartialObjectMetaExt`].
393    ///
394    /// ```no_run
395    /// use kube::api::{Api, PatchParams, Patch, Resource};
396    /// use kube::core::{PartialObjectMetaExt, ObjectMeta};
397    /// use k8s_openapi::api::core::v1::Pod;
398    ///
399    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
400    /// # let client: kube::Client = todo!();
401    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
402    /// let metadata = ObjectMeta {
403    ///     labels: Some([("key".to_string(), "value".to_string())].into()),
404    ///     ..Default::default()
405    /// }.into_request_partial::<Pod>();
406    ///
407    /// let params = PatchParams::apply("myapp");
408    /// let o_patched = pods.patch_metadata("blog", &params, &Patch::Apply(&metadata)).await?;
409    /// println!("Patched {}", o_patched.metadata.name.unwrap());
410    /// # Ok(())
411    /// # }
412    /// ```
413    /// [`Patch`]: super::Patch
414    /// [`PatchParams`]: super::PatchParams
415    /// [`PartialObjectMetaExt`]: crate::core::PartialObjectMetaExt
416    ///
417    /// ### Warnings
418    ///
419    /// The `TypeMeta` (apiVersion + kind) of a patch request (required for apply patches)
420    /// must match the underlying type that is being patched (e.g. "v1" + "Pod").
421    /// The returned `TypeMeta` will always be {"meta.k8s.io/v1", "PartialObjectMetadata"}.
422    /// These constraints are encoded into [`PartialObjectMetaExt`].
423    ///
424    /// This method can write to non-metadata fields such as spec if included in the patch.
425    pub async fn patch_metadata<P: Serialize + Debug>(
426        &self,
427        name: &str,
428        pp: &PatchParams,
429        patch: &Patch<P>,
430    ) -> Result<PartialObjectMeta<K>> {
431        let mut req = self
432            .request
433            .patch_metadata(name, pp, patch)
434            .map_err(Error::BuildRequest)?;
435        req.extensions_mut().insert("patch_metadata");
436        self.client.request::<PartialObjectMeta<K>>(req).await
437    }
438
439    /// Replace a resource entirely with a new one
440    ///
441    /// This is used just like [`Api::create`], but with one additional instruction:
442    /// You must set `metadata.resourceVersion` in the provided data because k8s
443    /// will not accept an update unless you actually knew what the last version was.
444    ///
445    /// Thus, to use this function, you need to do a `get` then a `replace` with its result.
446    ///
447    /// ```no_run
448    /// use kube::api::{Api, PostParams, ResourceExt};
449    /// use k8s_openapi::api::batch::v1::Job;
450    ///
451    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
452    /// # let client: kube::Client = todo!();
453    /// let jobs: Api<Job> = Api::namespaced(client, "apps");
454    /// let j = jobs.get("baz").await?;
455    /// let j_new: Job = serde_json::from_value(serde_json::json!({
456    ///     "apiVersion": "batch/v1",
457    ///     "kind": "Job",
458    ///     "metadata": {
459    ///         "name": "baz",
460    ///         "resourceVersion": j.resource_version(),
461    ///     },
462    ///     "spec": {
463    ///         "template": {
464    ///             "metadata": {
465    ///                 "name": "empty-job-pod"
466    ///             },
467    ///             "spec": {
468    ///                 "containers": [{
469    ///                     "name": "empty",
470    ///                     "image": "alpine:latest"
471    ///                 }],
472    ///                 "restartPolicy": "Never",
473    ///             }
474    ///         }
475    ///     }
476    /// }))?;
477    /// jobs.replace("baz", &PostParams::default(), &j_new).await?;
478    /// # Ok(())
479    /// # }
480    /// ```
481    ///
482    /// Consider mutating the result of `api.get` rather than recreating it.
483    ///
484    /// Note that this method cannot write to the status object (when it exists) of a resource.
485    /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
486    pub async fn replace(&self, name: &str, pp: &PostParams, data: &K) -> Result<K>
487    where
488        K: Serialize,
489    {
490        let bytes = serde_json::to_vec(&data).map_err(Error::SerdeError)?;
491        let mut req = self
492            .request
493            .replace(name, pp, bytes)
494            .map_err(Error::BuildRequest)?;
495        req.extensions_mut().insert("replace");
496        self.client.request::<K>(req).await
497    }
498
499    /// Watch a list of resources
500    ///
501    /// This returns a future that awaits the initial response,
502    /// then you can stream the remaining buffered `WatchEvent` objects.
503    ///
504    /// Note that a `watch` call can terminate for many reasons (even before the specified
505    /// [`WatchParams::timeout`] is triggered), and will have to be re-issued
506    /// with the last seen resource version when or if it closes.
507    ///
508    /// Consider using a managed [`watcher`] to deal with automatic re-watches and error cases.
509    ///
510    /// ```no_run
511    /// use kube::api::{Api, WatchParams, ResourceExt, WatchEvent};
512    /// use k8s_openapi::api::batch::v1::Job;
513    /// use futures::{StreamExt, TryStreamExt};
514    ///
515    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
516    /// # let client: kube::Client = todo!();
517    /// let jobs: Api<Job> = Api::namespaced(client, "apps");
518    /// let lp = WatchParams::default()
519    ///     .fields("metadata.name=my_job")
520    ///     .timeout(20); // upper bound of how long we watch for
521    /// let mut stream = jobs.watch(&lp, "0").await?.boxed();
522    /// while let Some(status) = stream.try_next().await? {
523    ///     match status {
524    ///         WatchEvent::Added(s) => println!("Added {}", s.name_any()),
525    ///         WatchEvent::Modified(s) => println!("Modified: {}", s.name_any()),
526    ///         WatchEvent::Deleted(s) => println!("Deleted {}", s.name_any()),
527    ///         WatchEvent::Bookmark(s) => {},
528    ///         WatchEvent::Error(s) => println!("{}", s),
529    ///     }
530    /// }
531    /// # Ok(())
532    /// # }
533    /// ```
534    /// [`WatchParams::timeout`]: super::WatchParams::timeout
535    /// [`watcher`]: https://docs.rs/kube_runtime/*/kube_runtime/watcher/fn.watcher.html
536    pub async fn watch(
537        &self,
538        wp: &WatchParams,
539        version: &str,
540    ) -> Result<impl Stream<Item = Result<WatchEvent<K>>> + use<K>> {
541        let mut req = self.request.watch(wp, version).map_err(Error::BuildRequest)?;
542        req.extensions_mut().insert("watch");
543        self.client.request_events::<K>(req).await
544    }
545
546    /// Watch a list of metadata for a given resources
547    ///
548    /// This returns a future that awaits the initial response,
549    /// then you can stream the remaining buffered `WatchEvent` objects.
550    ///
551    /// Note that a `watch_metadata` call can terminate for many reasons (even
552    /// before the specified [`WatchParams::timeout`] is triggered), and will
553    /// have to be re-issued with the last seen resource version when or if it
554    /// closes.
555    ///
556    /// Consider using a managed [`metadata_watcher`] to deal with automatic re-watches and error cases.
557    ///
558    /// ```no_run
559    /// use kube::api::{Api, WatchParams, ResourceExt, WatchEvent};
560    /// use k8s_openapi::api::batch::v1::Job;
561    /// use futures::{StreamExt, TryStreamExt};
562    ///
563    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
564    /// # let client: kube::Client = todo!();
565    /// let jobs: Api<Job> = Api::namespaced(client, "apps");
566    ///
567    /// let lp = WatchParams::default()
568    ///     .fields("metadata.name=my_job")
569    ///     .timeout(20); // upper bound of how long we watch for
570    /// let mut stream = jobs.watch(&lp, "0").await?.boxed();
571    /// while let Some(status) = stream.try_next().await? {
572    ///     match status {
573    ///         WatchEvent::Added(s) => println!("Added {}", s.metadata.name.unwrap()),
574    ///         WatchEvent::Modified(s) => println!("Modified: {}", s.metadata.name.unwrap()),
575    ///         WatchEvent::Deleted(s) => println!("Deleted {}", s.metadata.name.unwrap()),
576    ///         WatchEvent::Bookmark(s) => {},
577    ///         WatchEvent::Error(s) => println!("{}", s),
578    ///     }
579    /// }
580    /// # Ok(())
581    /// # }
582    /// ```
583    /// [`WatchParams::timeout`]: super::WatchParams::timeout
584    /// [`metadata_watcher`]: https://docs.rs/kube_runtime/*/kube_runtime/watcher/fn.metadata_watcher.html
585    pub async fn watch_metadata(
586        &self,
587        wp: &WatchParams,
588        version: &str,
589    ) -> Result<impl Stream<Item = Result<WatchEvent<PartialObjectMeta<K>>>> + use<K>> {
590        let mut req = self
591            .request
592            .watch_metadata(wp, version)
593            .map_err(Error::BuildRequest)?;
594        req.extensions_mut().insert("watch_metadata");
595        self.client.request_events::<PartialObjectMeta<K>>(req).await
596    }
597}