Skip to main content

kube_client/
lib.rs

1//! Crate for interacting with the Kubernetes API
2//!
3//! This crate includes the tools for manipulating Kubernetes resources as
4//! well as keeping track of those resources as they change over time
5//!
6//! # Example
7//!
8//! The following example will create a [`Pod`](k8s_openapi::api::core::v1::Pod)
9//! and then watch for it to become available using a manual [`Api::watch`] call.
10//!
11//! ```rust,no_run
12//! use futures::{StreamExt, TryStreamExt};
13//! use kube_client::api::{Api, ResourceExt, ListParams, PatchParams, Patch};
14//! use kube_client::Client;
15//! use k8s_openapi::api::core::v1::Pod;
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
19//!     // Read the environment to find config for kube client.
20//!     // Note that this tries an in-cluster configuration first,
21//!     // then falls back on a kubeconfig file.
22//!     let client = Client::try_default().await?;
23//!
24//!     // Interact with pods in the configured namespace with the typed interface from k8s-openapi
25//!     let pods: Api<Pod> = Api::default_namespaced(client);
26//!
27//!     // Create a Pod (cheating here with json, but it has to validate against the type):
28//!     let patch: Pod = serde_json::from_value(serde_json::json!({
29//!         "apiVersion": "v1",
30//!         "kind": "Pod",
31//!         "metadata": {
32//!             "name": "my-pod"
33//!         },
34//!         "spec": {
35//!             "containers": [
36//!                 {
37//!                     "name": "my-container",
38//!                     "image": "myregistry.azurecr.io/hello-world:v1",
39//!                 },
40//!             ],
41//!         }
42//!     }))?;
43//!
44//!     // Apply the Pod via server-side apply
45//!     let params = PatchParams::apply("myapp");
46//!     let result = pods.patch("my-pod", &params, &Patch::Apply(&patch)).await?;
47//!
48//!     // List pods in the configured namespace
49//!     for p in pods.list(&ListParams::default()).await? {
50//!         println!("found pod {}", p.name_any());
51//!     }
52//!
53//!     Ok(())
54//! }
55//! ```
56//!
57//! For more details, see:
58//!
59//! - [`Client`](crate::client) for the extensible Kubernetes client
60//! - [`Config`](crate::config) for the Kubernetes config abstraction
61//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
62//! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly
63#![cfg_attr(docsrs, feature(doc_cfg))]
64// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
65#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68    ($($item:item)*) => {
69        $(
70            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71            #[cfg(feature = "client")]
72            $item
73        )*
74    }
75}
76macro_rules! cfg_config {
77    ($($item:item)*) => {
78        $(
79            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80            #[cfg(feature = "config")]
81            $item
82        )*
83    }
84}
85
86macro_rules! cfg_error {
87    ($($item:item)*) => {
88        $(
89            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90            #[cfg(any(feature = "config", feature = "client"))]
91            $item
92        )*
93    }
94}
95
96cfg_client! {
97    pub mod api;
98    pub mod discovery;
99    pub mod client;
100
101    #[doc(inline)]
102    pub use api::Api;
103    #[doc(inline)]
104    pub use client::Client;
105    #[doc(inline)]
106    pub use discovery::Discovery;
107}
108
109cfg_config! {
110    pub mod config;
111    #[doc(inline)]
112    pub use config::Config;
113}
114
115cfg_error! {
116    pub mod error;
117    #[doc(inline)] pub use error::Error;
118    /// Convient alias for `Result<T, Error>`
119    pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123/// Re-exports from kube_core
124pub use kube_core as core;
125
126// Tests that require a cluster and the complete feature set
127// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
128#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] // varying test imports depending on feature
131mod test {
132    use crate::{
133        Api, Client, Config, ResourceExt,
134        api::{AttachParams, AttachedProcess},
135        client::ConfigExt,
136    };
137    use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138    use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
139    use kube_core::{
140        params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
141        response::StatusSummary,
142    };
143    use serde_json::json;
144    use tower::ServiceBuilder;
145
146    // hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
147    #[allow(dead_code)]
148    // #[tokio::test]
149    #[ignore = "needs cluster (lists pods)"]
150    #[cfg(feature = "rustls-tls")]
151    async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
152        use hyper_util::rt::TokioExecutor;
153
154        let config = Config::infer().await?;
155        let https = config.rustls_https_connector()?;
156        let service = ServiceBuilder::new()
157            .layer(config.base_uri_layer())
158            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
159        let client = Client::new(service, config.default_namespace);
160        let pods: Api<Pod> = Api::default_namespaced(client);
161        pods.list(&Default::default()).await?;
162        Ok(())
163    }
164
165    #[tokio::test]
166    #[ignore = "needs cluster (lists pods)"]
167    #[cfg(feature = "openssl-tls")]
168    async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
169        use hyper_util::rt::TokioExecutor;
170
171        let config = Config::infer().await?;
172        let https = config.openssl_https_connector()?;
173        let service = ServiceBuilder::new()
174            .layer(config.base_uri_layer())
175            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
176        let client = Client::new(service, config.default_namespace);
177        let pods: Api<Pod> = Api::default_namespaced(client);
178        pods.list(&Default::default()).await?;
179        Ok(())
180    }
181
182    #[tokio::test]
183    #[ignore = "needs cluster (lists api resources)"]
184    #[cfg(feature = "client")]
185    async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
186        use crate::{core::DynamicObject, discovery};
187        let client = Client::try_default().await?;
188        let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
189        let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
190        let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
191        api.list(&Default::default()).await?;
192
193        Ok(())
194    }
195
196    #[tokio::test]
197    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
198    #[cfg(feature = "client")]
199    async fn aggregated_discovery_apis() -> Result<(), Box<dyn std::error::Error>> {
200        let client = Client::try_default().await?;
201
202        // Test /apis aggregated discovery
203        let apis_discovery = client.list_api_groups_aggregated().await?;
204        assert!(!apis_discovery.items.is_empty(), "should have API groups");
205
206        // Find the apps group
207        let apps_group = apis_discovery
208            .items
209            .iter()
210            .find(|g| g.metadata.as_ref().and_then(|m| m.name.as_ref()) == Some(&"apps".to_string()));
211        assert!(apps_group.is_some(), "should have apps group");
212
213        let apps = apps_group.unwrap();
214        assert!(!apps.versions.is_empty(), "apps should have versions");
215
216        // Check that deployments resource exists in apps/v1
217        let v1 = apps.versions.iter().find(|v| v.version == Some("v1".to_string()));
218        assert!(v1.is_some(), "apps should have v1");
219
220        let deployments = v1
221            .unwrap()
222            .resources
223            .iter()
224            .find(|r| r.resource == Some("deployments".to_string()));
225        assert!(deployments.is_some(), "apps/v1 should have deployments");
226
227        Ok(())
228    }
229
230    #[tokio::test]
231    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
232    #[cfg(feature = "client")]
233    async fn aggregated_discovery_core() -> Result<(), Box<dyn std::error::Error>> {
234        let client = Client::try_default().await?;
235
236        // Test /api aggregated discovery (core group)
237        let core_discovery = client.list_core_api_versions_aggregated().await?;
238        assert!(!core_discovery.items.is_empty(), "should have core group");
239
240        let core = &core_discovery.items[0];
241        let v1 = core.versions.iter().find(|v| v.version == Some("v1".to_string()));
242        assert!(v1.is_some(), "core should have v1");
243
244        // Check that pods resource exists
245        let pods = v1
246            .unwrap()
247            .resources
248            .iter()
249            .find(|r| r.resource == Some("pods".to_string()));
250        assert!(pods.is_some(), "core/v1 should have pods");
251
252        let pods_resource = pods.unwrap();
253        assert_eq!(pods_resource.scope, Some("Namespaced".to_string()));
254        assert!(pods_resource.verbs.contains(&"list".to_string()));
255
256        Ok(())
257    }
258
259    #[tokio::test]
260    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
261    #[cfg(feature = "client")]
262    async fn discovery_run_aggregated() -> Result<(), Box<dyn std::error::Error>> {
263        use crate::discovery::{Discovery, verbs};
264
265        let client = Client::try_default().await?;
266
267        // Test Discovery::run_aggregated()
268        let discovery = Discovery::new(client.clone()).run_aggregated().await?;
269
270        // Should have discovered groups
271        assert!(discovery.groups().count() > 0, "should have discovered groups");
272
273        // Should have core group
274        assert!(discovery.has_group(""), "should have core group");
275
276        // Should have apps group
277        assert!(discovery.has_group("apps"), "should have apps group");
278
279        // Check that we can find deployments in apps group
280        let apps = discovery.get("apps").expect("apps group");
281        let (ar, caps) = apps.recommended_kind("Deployment").expect("Deployment kind");
282        assert_eq!(ar.kind, "Deployment");
283        assert!(caps.supports_operation(verbs::LIST));
284
285        // Check that we can find pods in core group
286        let core = discovery.get("").expect("core group");
287        let (ar, caps) = core.recommended_kind("Pod").expect("Pod kind");
288        assert_eq!(ar.kind, "Pod");
289        assert!(caps.supports_operation(verbs::GET));
290
291        Ok(())
292    }
293
294    #[tokio::test]
295    #[ignore = "needs cluster (will create and edit a pod)"]
296    async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
297        use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
298
299        let client = Client::try_default().await?;
300        let pods: Api<Pod> = Api::default_namespaced(client);
301
302        // create busybox pod that's alive for at most 30s
303        let p: Pod = serde_json::from_value(json!({
304            "apiVersion": "v1",
305            "kind": "Pod",
306            "metadata": {
307                "name": "busybox-kube1",
308                "labels": { "app": "kube-rs-test" },
309            },
310            "spec": {
311                "terminationGracePeriodSeconds": 1,
312                "restartPolicy": "Never",
313                "containers": [{
314                  "name": "busybox",
315                  "image": "busybox:stable",
316                  "command": ["sh", "-c", "sleep 30"],
317                }],
318            }
319        }))?;
320
321        let pp = PostParams::default();
322        match pods.create(&pp, &p).await {
323            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
324            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
325            Err(e) => return Err(e.into()),                         // any other case if a failure
326        }
327
328        // Manual watch-api for it to become ready
329        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
330        let wp = WatchParams::default()
331            .fields(&format!("metadata.name={}", "busybox-kube1"))
332            .timeout(15);
333        let mut stream = pods.watch(&wp, "0").await?.boxed();
334        while let Some(ev) = stream.try_next().await? {
335            // can debug format watch event
336            let _ = format!("we: {ev:?}");
337            match ev {
338                WatchEvent::Modified(o) => {
339                    let s = o.status.as_ref().expect("status exists on pod");
340                    let phase = s.phase.clone().unwrap_or_default();
341                    if phase == "Running" {
342                        break;
343                    }
344                }
345                WatchEvent::Error(e) => panic!("watch error: {e}"),
346                _ => {}
347            }
348        }
349
350        // Verify we can get it
351        let mut pod = pods.get("busybox-kube1").await?;
352        assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
353
354        // verify replace with explicit resource version
355        // NB: don't do this; use server side apply
356        {
357            assert!(pod.resource_version().is_some());
358            pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
359
360            let pp = PostParams::default();
361            let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
362            assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
363        }
364
365        // Delete it
366        let dp = DeleteParams::default();
367        pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
368            assert_eq!(pdel.name_unchecked(), "busybox-kube1");
369        });
370
371        Ok(())
372    }
373
374    #[tokio::test]
375    #[ignore = "needs cluster (will create and attach to a pod)"]
376    #[cfg(feature = "ws")]
377    async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
378        use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
379        use tokio::io::AsyncWriteExt;
380
381        let client = Client::try_default().await?;
382        let pods: Api<Pod> = Api::default_namespaced(client);
383
384        // create busybox pod that's alive for at most 30s
385        let p: Pod = serde_json::from_value(json!({
386            "apiVersion": "v1",
387            "kind": "Pod",
388            "metadata": {
389                "name": "busybox-kube2",
390                "labels": { "app": "kube-rs-test" },
391            },
392            "spec": {
393                "terminationGracePeriodSeconds": 1,
394                "restartPolicy": "Never",
395                "containers": [{
396                  "name": "busybox",
397                  "image": "busybox:stable",
398                  "command": ["sh", "-c", "sleep 30"],
399                }],
400            }
401        }))?;
402
403        match pods.create(&Default::default(), &p).await {
404            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
405            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
406            Err(e) => return Err(e.into()),                         // any other case if a failure
407        }
408
409        // Manual watch-api for it to become ready
410        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
411        let wp = WatchParams::default()
412            .fields(&format!("metadata.name={}", "busybox-kube2"))
413            .timeout(15);
414        let mut stream = pods.watch(&wp, "0").await?.boxed();
415        while let Some(ev) = stream.try_next().await? {
416            match ev {
417                WatchEvent::Modified(o) => {
418                    let s = o.status.as_ref().expect("status exists on pod");
419                    let phase = s.phase.clone().unwrap_or_default();
420                    if phase == "Running" {
421                        break;
422                    }
423                }
424                WatchEvent::Error(e) => panic!("watch error: {e}"),
425                _ => {}
426            }
427        }
428
429        // Verify exec works and we can get the output
430        {
431            let mut attached = pods
432                .exec(
433                    "busybox-kube2",
434                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
435                    &AttachParams::default().stderr(false),
436                )
437                .await?;
438            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
439            let out = stdout
440                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
441                .collect::<Vec<_>>()
442                .await
443                .join("");
444            attached.join().await.unwrap();
445            assert_eq!(out.lines().count(), 3);
446            assert_eq!(out, "1\n2\n3\n");
447        }
448
449        // Verify we read from stdout after stdin is closed.
450        {
451            let name = "busybox-kube2";
452            let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
453            let ap = AttachParams::default().stdin(true).stderr(false);
454
455            // Make a connection so we can determine if the K8s cluster supports stream closing.
456            let mut req = pods.request.exec(name, command.clone(), &ap)?;
457            req.extensions_mut().insert("exec");
458            let stream = pods.client.connect(req).await?;
459
460            // This only works is the cluster supports protocol version v5.channel.k8s.io
461            // Skip for older protocols.
462            if stream.supports_stream_close() {
463                let mut attached = pods.exec(name, command, &ap).await?;
464                let mut stdin_writer = attached.stdin().unwrap();
465                let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
466
467                stdin_writer.write_all(b"this will be ignored\n").await?;
468                _ = stdin_writer.shutdown().await;
469
470                let next_stdout = stdout_stream.next();
471                let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
472                assert_eq!(stdout, "test string 2\n");
473
474                // AttachedProcess resolves with status object.
475                let status = attached.take_status().unwrap();
476                if let Some(status) = status.await {
477                    assert_eq!(status.status, Some("Success".to_owned()));
478                    assert_eq!(status.reason, None);
479                }
480            }
481        }
482
483        // Verify we can write to Stdin
484        {
485            let mut attached = pods
486                .exec(
487                    "busybox-kube2",
488                    vec!["sh"],
489                    &AttachParams::default().stdin(true).stderr(false),
490                )
491                .await?;
492            let mut stdin_writer = attached.stdin().unwrap();
493            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
494            let next_stdout = stdout_stream.next();
495            stdin_writer.write_all(b"echo test string 1\n").await?;
496            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
497            println!("{stdout}");
498            assert_eq!(stdout, "test string 1\n");
499
500            // AttachedProcess resolves with status object.
501            // Send `exit 1` to get a failure status.
502            stdin_writer.write_all(b"exit 1\n").await?;
503            let status = attached.take_status().unwrap();
504            if let Some(status) = status.await {
505                println!("{status:?}");
506                assert_eq!(status.status, Some("Failure".to_owned()));
507                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
508            }
509        }
510
511        // Delete it
512        let dp = DeleteParams::default();
513        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
514            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
515        });
516
517        Ok(())
518    }
519
520    #[tokio::test]
521    #[ignore = "needs cluster (will create and tail logs from a pod)"]
522    async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
523        use crate::{
524            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
525            core::subresource::LogParams,
526        };
527
528        let client = Client::try_default().await?;
529        let pods: Api<Pod> = Api::default_namespaced(client);
530
531        // create busybox pod that's alive for at most 30s
532        let p: Pod = serde_json::from_value(json!({
533            "apiVersion": "v1",
534            "kind": "Pod",
535            "metadata": {
536                "name": "busybox-kube3",
537                "labels": { "app": "kube-rs-test" },
538            },
539            "spec": {
540                "terminationGracePeriodSeconds": 1,
541                "restartPolicy": "Never",
542                "containers": [{
543                  "name": "busybox",
544                  "image": "busybox:stable",
545                  "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
546                }],
547            }
548        }))?;
549
550        match pods.create(&Default::default(), &p).await {
551            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
552            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
553            Err(e) => return Err(e.into()),                         // any other case if a failure
554        }
555
556        // Manual watch-api for it to become ready
557        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
558        let wp = WatchParams::default()
559            .fields(&format!("metadata.name={}", "busybox-kube3"))
560            .timeout(15);
561        let mut stream = pods.watch(&wp, "0").await?.boxed();
562        while let Some(ev) = stream.try_next().await? {
563            match ev {
564                WatchEvent::Modified(o) => {
565                    let s = o.status.as_ref().expect("status exists on pod");
566                    let phase = s.phase.clone().unwrap_or_default();
567                    if phase == "Running" {
568                        break;
569                    }
570                }
571                WatchEvent::Error(e) => panic!("watch error: {e}"),
572                _ => {}
573            }
574        }
575
576        // Get current list of logs
577        let lp = LogParams {
578            follow: true,
579            ..LogParams::default()
580        };
581        let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
582
583        // wait for container to finish
584        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
585
586        let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
587        assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
588
589        // individual logs may or may not buffer
590        let mut output = vec![];
591        while let Some(line) = logs_stream.try_next().await? {
592            output.push(line);
593        }
594        assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
595
596        // evict the pod
597        let ep = EvictParams::default();
598        let eres = pods.evict("busybox-kube3", &ep).await?;
599        assert_eq!(eres.code, 201); // created
600        assert!(eres.is_success());
601
602        Ok(())
603    }
604
605    #[tokio::test]
606    #[ignore = "requires a cluster"]
607    async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
608        use crate::{
609            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
610            core::subresource::LogParams,
611        };
612        use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
613
614        let client = Client::try_default().await?;
615        let pods: Api<Pod> = Api::default_namespaced(client);
616
617        // create busybox pod that's alive for at most 30s
618        let p: Pod = serde_json::from_value(json!({
619            "apiVersion": "v1",
620            "kind": "Pod",
621            "metadata": {
622                "name": "busybox-kube-meta",
623                "labels": { "app": "kube-rs-test" },
624            },
625            "spec": {
626                "terminationGracePeriodSeconds": 1,
627                "restartPolicy": "Never",
628                "containers": [{
629                  "name": "busybox",
630                  "image": "busybox:stable",
631                  "command": ["sh", "-c", "sleep 30s"],
632                }],
633            }
634        }))?;
635
636        match pods.create(&Default::default(), &p).await {
637            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
638            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
639            Err(e) => return Err(e.into()),                         // any other case if a failure
640        }
641
642        // Test we can get a pod as a PartialObjectMeta and convert to
643        // ObjectMeta
644        let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
645        assert_eq!("busybox-kube-meta", pod_metadata.name_any());
646        assert_eq!(
647            Some((&"app".to_string(), &"kube-rs-test".to_string())),
648            pod_metadata.labels().get_key_value("app")
649        );
650
651        // Test we can get a list of PartialObjectMeta for pods
652        let p_list = pods.list_metadata(&ListParams::default()).await?;
653
654        // Find only pod we are concerned with in this test and fail eagerly if
655        // name doesn't exist
656        let pod_metadata = p_list
657            .items
658            .into_iter()
659            .find(|p| p.name_any() == "busybox-kube-meta")
660            .unwrap();
661        assert_eq!(
662            pod_metadata.labels().get("app"),
663            Some(&"kube-rs-test".to_string())
664        );
665
666        // Attempt to patch pod metadata
667        let patch = ObjectMeta {
668            annotations: Some([("test".to_string(), "123".to_string())].into()),
669            ..Default::default()
670        }
671        .into_request_partial::<Pod>();
672
673        let patchparams = PatchParams::default();
674        let p_patched = pods
675            .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
676            .await?;
677        assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
678        assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
679        assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
680
681        // Clean-up
682        let dp = DeleteParams::default();
683        pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
684            assert_eq!(pdel.name_any(), "busybox-kube-meta");
685        });
686
687        Ok(())
688    }
689    #[tokio::test]
690    #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
691    async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
692        use crate::api::PostParams;
693        use k8s_openapi::api::certificates::v1::{
694            CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
695        };
696
697        let csr_name = "fake";
698        let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
699            "apiVersion": "certificates.k8s.io/v1",
700            "kind": "CertificateSigningRequest",
701            "metadata": { "name": csr_name },
702            "spec": {
703                "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
704                "signerName": "kubernetes.io/kube-apiserver-client",
705                "expirationSeconds": 86400,
706                "usages": ["client auth"]
707            }
708        }))?;
709
710        let client = Client::try_default().await?;
711        let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
712        assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
713
714        // Patch the approval and approve the CSR
715        let approval_type = "ApprovedFake";
716        let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
717            certificate: None,
718            conditions: Some(vec![CertificateSigningRequestCondition {
719                type_: approval_type.to_string(),
720                last_update_time: None,
721                last_transition_time: None,
722                message: Some(format!("{} {}", approval_type, "by kube-rs client")),
723                reason: Some("kube-rsClient".to_string()),
724                status: "True".to_string(),
725            }]),
726        };
727        let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
728        let _ = csr
729            .patch_approval(csr_name, &Default::default(), &csr_status_patch)
730            .await?;
731        let csr_after_approval = csr.get_approval(csr_name).await?;
732
733        assert_eq!(
734            csr_after_approval
735                .status
736                .as_ref()
737                .unwrap()
738                .conditions
739                .as_ref()
740                .unwrap()[0]
741                .type_,
742            approval_type.to_string()
743        );
744        csr.delete(csr_name, &DeleteParams::default()).await?;
745        Ok(())
746    }
747
748    #[tokio::test]
749    #[ignore = "needs cluster for ephemeral containers operations"]
750    async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
751        let client = Client::try_default().await?;
752
753        // Ephemeral containers were stabilized in Kubernetes v1.25.
754        // This test therefore exits early if the current cluster version is older than v1.25.
755        let api_version = client.apiserver_version().await?;
756        if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
757            return Ok(());
758        }
759
760        let pod: Pod = serde_json::from_value(serde_json::json!({
761            "apiVersion": "v1",
762            "kind": "Pod",
763            "metadata": {
764                "name": "ephemeral-container-test",
765                "labels": { "app": "kube-rs-test" },
766            },
767            "spec": {
768                "restartPolicy": "Never",
769                "containers": [{
770                  "name": "busybox",
771                  "image": "busybox:stable",
772                  "command": ["sh", "-c", "sleep 2"],
773                }],
774            }
775        }))?;
776
777        let pod_name = pod.name_any();
778        let pods = Api::<Pod>::default_namespaced(client);
779
780        // If cleanup failed and a pod already exists, we attempt to remove it
781        // before proceeding. This is important as ephemeral containers can't
782        // be removed from a Pod's spec. Therefore this test must start with a fresh
783        // Pod every time.
784        let _ = pods
785            .delete(&pod.name_any(), &DeleteParams::default())
786            .await
787            .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
788
789        // Ephemeral containes can only be applied to a running pod, so one must
790        // be created before any operations are tested.
791        match pods.create(&Default::default(), &pod).await {
792            Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
793            Err(e) => return Err(e.into()), // any other case if a failure
794        }
795
796        let current_ephemeral_containers = pods
797            .get_ephemeral_containers(&pod.name_any())
798            .await?
799            .spec
800            .unwrap()
801            .ephemeral_containers;
802
803        // We expect no ephemeral containers initially, get_ephemeral_containers should
804        // reflect that.
805        assert_eq!(current_ephemeral_containers, None);
806
807        let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
808            {
809                "name": "myephemeralcontainer1",
810                "image": "busybox:stable",
811                "command": ["sh", "-c", "sleep 2"],
812            }
813        ))?;
814
815        // Attempt to replace ephemeral containers.
816
817        let patch: Pod = serde_json::from_value(json!({
818            "metadata": { "name": pod_name },
819            "spec":{ "ephemeralContainers": [ busybox_eph ] }
820        }))?;
821
822        let current_containers = pods
823            .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
824            .await?
825            .spec
826            .unwrap()
827            .ephemeral_containers
828            .expect("could find ephemeral container");
829
830        // Note that we can't compare the whole ephemeral containers object, as some fields
831        // are set by the cluster. We therefore compare the fields specified in the patch.
832        assert_eq!(current_containers.len(), 1);
833        assert_eq!(current_containers[0].name, busybox_eph.name);
834        assert_eq!(current_containers[0].image, busybox_eph.image);
835        assert_eq!(current_containers[0].command, busybox_eph.command);
836
837        // Attempt to patch ephemeral containers.
838
839        // The new ephemeral container will have different values from the
840        // first to ensure we can test for its presence.
841        busybox_eph = serde_json::from_value(json!(
842            {
843                "name": "myephemeralcontainer2",
844                "image": "busybox:stable",
845                "command": ["sh", "-c", "sleep 1"],
846            }
847        ))?;
848
849        let patch: Pod =
850            serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
851
852        let current_containers = pods
853            .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
854            .await?
855            .spec
856            .unwrap()
857            .ephemeral_containers
858            .expect("could find ephemeral container");
859
860        // There should only be 2 ephemeral containers at this point,
861        // one from each patch
862        assert_eq!(current_containers.len(), 2);
863
864        let new_container = current_containers
865            .iter()
866            .find(|c| c.name == busybox_eph.name)
867            .expect("could find myephemeralcontainer2");
868
869        // Note that we can't compare the whole ephemeral container object, as some fields
870        // get set in the cluster. We therefore compare the fields specified in the patch.
871        assert_eq!(new_container.image, busybox_eph.image);
872        assert_eq!(new_container.command, busybox_eph.command);
873
874        // Attempt to get ephemeral containers.
875
876        let expected_containers = current_containers;
877
878        let current_containers = pods
879            .get_ephemeral_containers(&pod.name_any())
880            .await?
881            .spec
882            .unwrap()
883            .ephemeral_containers
884            .unwrap();
885
886        assert_eq!(current_containers, expected_containers);
887
888        pods.delete(&pod.name_any(), &DeleteParams::default())
889            .await?
890            .map_left(|pdel| {
891                assert_eq!(pdel.name_any(), pod.name_any());
892            });
893
894        Ok(())
895    }
896
897    #[tokio::test]
898    #[ignore = "needs kubelet debug methods"]
899    #[cfg(feature = "kubelet-debug")]
900    async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
901        use crate::{
902            api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
903            core::kubelet_debug::KubeletDebugParams,
904        };
905
906        let client = Client::try_default().await?;
907        let pods: Api<Pod> = Api::default_namespaced(client);
908
909        // create busybox pod that's alive for at most 30s
910        let p: Pod = serde_json::from_value(json!({
911            "apiVersion": "v1",
912            "kind": "Pod",
913            "metadata": {
914                "name": "busybox-kube2",
915                "labels": { "app": "kube-rs-test" },
916            },
917            "spec": {
918                "terminationGracePeriodSeconds": 1,
919                "restartPolicy": "Never",
920                "containers": [{
921                  "name": "busybox",
922                  "image": "busybox:stable",
923                  "command": ["sh", "-c", "sleep 30"],
924                }],
925            }
926        }))?;
927
928        match pods.create(&Default::default(), &p).await {
929            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
930            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
931            Err(e) => return Err(e.into()),                         // any other case if a failure
932        }
933
934        // Manual watch-api for it to become ready
935        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
936        let wp = WatchParams::default()
937            .fields(&format!("metadata.name={}", "busybox-kube2"))
938            .timeout(15);
939        let mut stream = pods.watch(&wp, "0").await?.boxed();
940        while let Some(ev) = stream.try_next().await? {
941            match ev {
942                WatchEvent::Modified(o) => {
943                    let s = o.status.as_ref().expect("status exists on pod");
944                    let phase = s.phase.clone().unwrap_or_default();
945                    if phase == "Running" {
946                        break;
947                    }
948                }
949                WatchEvent::Error(e) => panic!("watch error: {e}"),
950                _ => {}
951            }
952        }
953
954        let mut config = Config::infer().await?;
955        config.accept_invalid_certs = true;
956        config.cluster_url = "https://localhost:10250".parse().unwrap();
957        let kubelet_client: Client = config.try_into()?;
958
959        // Verify exec works and we can get the output
960        {
961            let mut attached = kubelet_client
962                .kubelet_node_exec(
963                    &KubeletDebugParams {
964                        name: "busybox-kube2",
965                        namespace: "default",
966                        ..Default::default()
967                    },
968                    "busybox",
969                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
970                    &AttachParams::default().stderr(false),
971                )
972                .await?;
973            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
974            let out = stdout
975                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
976                .collect::<Vec<_>>()
977                .await
978                .join("");
979            attached.join().await.unwrap();
980            assert_eq!(out.lines().count(), 3);
981            assert_eq!(out, "1\n2\n3\n");
982        }
983
984        // Verify we can write to Stdin
985        {
986            use tokio::io::AsyncWriteExt;
987            let mut attached = kubelet_client
988                .kubelet_node_exec(
989                    &KubeletDebugParams {
990                        name: "busybox-kube2",
991                        namespace: "default",
992                        ..Default::default()
993                    },
994                    "busybox",
995                    vec!["sh"],
996                    &AttachParams::default().stdin(true).stderr(false),
997                )
998                .await?;
999            let mut stdin_writer = attached.stdin().unwrap();
1000            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
1001            let next_stdout = stdout_stream.next();
1002            stdin_writer.write_all(b"echo test string 1\n").await?;
1003            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
1004            println!("{stdout}");
1005            assert_eq!(stdout, "test string 1\n");
1006
1007            // AttachedProcess resolves with status object.
1008            // Send `exit 1` to get a failure status.
1009            stdin_writer.write_all(b"exit 1\n").await?;
1010            let status = attached.take_status().unwrap();
1011            if let Some(status) = status.await {
1012                println!("{status:?}");
1013                assert_eq!(status.status, Some("Failure".to_owned()));
1014                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
1015            }
1016        }
1017
1018        // Delete it
1019        let dp = DeleteParams::default();
1020        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
1021            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
1022        });
1023
1024        Ok(())
1025    }
1026}