kube-client 0.78.0

Kubernetes client
Documentation
//! Crate for interacting with the Kubernetes API
//!
//! This crate includes the tools for manipulating Kubernetes resources as
//! well as keeping track of those resources as they change over time
//!
//! # Example
//!
//! The following example will create a [`Pod`](k8s_openapi::api::core::v1::Pod)
//! and then watch for it to become available using a manual [`Api::watch`] call.
//!
//! ```rust,no_run
//! use futures::{StreamExt, TryStreamExt};
//! use kube_client::api::{Api, ResourceExt, ListParams, PatchParams, Patch};
//! use kube_client::Client;
//! use k8s_openapi::api::core::v1::Pod;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!     // Read the environment to find config for kube client.
//!     // Note that this tries an in-cluster configuration first,
//!     // then falls back on a kubeconfig file.
//!     let client = Client::try_default().await?;
//!
//!     // Interact with pods in the configured namespace with the typed interface from k8s-openapi
//!     let pods: Api<Pod> = Api::default_namespaced(client);
//!
//!     // Create a Pod (cheating here with json, but it has to validate against the type):
//!     let patch: Pod = serde_json::from_value(serde_json::json!({
//!         "apiVersion": "v1",
//!         "kind": "Pod",
//!         "metadata": {
//!             "name": "my-pod"
//!         },
//!         "spec": {
//!             "containers": [
//!                 {
//!                     "name": "my-container",
//!                     "image": "myregistry.azurecr.io/hello-world:v1",
//!                 },
//!             ],
//!         }
//!     }))?;
//!
//!     // Apply the Pod via server-side apply
//!     let params = PatchParams::apply("myapp");
//!     let result = pods.patch("my-pod", &params, &Patch::Apply(&patch)).await?;
//!
//!     // List pods in the configured namespace
//!     for p in pods.list(&ListParams::default()).await? {
//!         println!("found pod {}", p.name_any());
//!     }
//!
//!     Ok(())
//! }
//! ```
//!
//! For more details, see:
//!
//! - [`Client`](crate::client) for the extensible Kubernetes client
//! - [`Config`](crate::config) for the Kubernetes config abstraction
//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
//! - [k8s-openapi](https://docs.rs/k8s-openapi/*/k8s_openapi/) for how to create typed kubernetes objects directly
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]
#![forbid(unsafe_code)]
// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
#![allow(clippy::unnecessary_lazy_evaluations)]

macro_rules! cfg_client {
    ($($item:item)*) => {
        $(
            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
            #[cfg(feature = "client")]
            $item
        )*
    }
}
macro_rules! cfg_config {
    ($($item:item)*) => {
        $(
            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
            #[cfg(feature = "config")]
            $item
        )*
    }
}

macro_rules! cfg_error {
    ($($item:item)*) => {
        $(
            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
            #[cfg(any(feature = "config", feature = "client"))]
            $item
        )*
    }
}

cfg_client! {
    pub mod api;
    pub mod discovery;
    pub mod client;

    #[doc(inline)]
    pub use api::Api;
    #[doc(inline)]
    pub use client::Client;
    #[doc(inline)]
    pub use discovery::Discovery;
}

cfg_config! {
    pub mod config;
    #[doc(inline)]
    pub use config::Config;
}

cfg_error! {
    pub mod error;
    #[doc(inline)] pub use error::Error;
    /// Convient alias for `Result<T, Error>`
    pub type Result<T, E = Error> = std::result::Result<T, E>;
}

pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
/// Re-exports from kube_core
pub use kube_core as core;


// Tests that require a cluster and the complete feature set
// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
#[cfg(all(feature = "client", feature = "config"))]
#[cfg(test)]
mod test {
    #![allow(unused_imports)]
    use crate::{
        api::{AttachParams, AttachedProcess},
        client::ConfigExt,
        Api, Client, Config, ResourceExt,
    };
    use futures::{StreamExt, TryStreamExt};
    use k8s_openapi::api::core::v1::Pod;
    use kube_core::{
        params::{DeleteParams, Patch},
        response::StatusSummary,
    };
    use serde_json::json;
    use tower::ServiceBuilder;

    // hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
    #[cfg(feature = "when_rustls_works_with_k3d")]
    #[tokio::test]
    #[ignore] // needs cluster (lists pods)
    #[cfg(all(feature = "rustls-tls"))]
    async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
        let config = Config::infer().await?;
        let https = config.rustls_https_connector()?;
        let service = ServiceBuilder::new()
            .layer(config.base_uri_layer())
            .service(hyper::Client::builder().build(https));
        let client = Client::new(service, config.default_namespace);
        let pods: Api<Pod> = Api::default_namespaced(client);
        pods.list(&Default::default()).await?;
        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (lists pods)
    #[cfg(all(feature = "openssl-tls"))]
    async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
        let config = Config::infer().await?;
        let https = config.openssl_https_connector()?;
        let service = ServiceBuilder::new()
            .layer(config.base_uri_layer())
            .service(hyper::Client::builder().build(https));
        let client = Client::new(service, config.default_namespace);
        let pods: Api<Pod> = Api::default_namespaced(client);
        pods.list(&Default::default()).await?;
        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (lists api resources)
    #[cfg(all(feature = "discovery"))]
    async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
        use crate::{core::DynamicObject, discovery};
        let client = Client::try_default().await?;
        let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
        let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
        let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
        api.list(&Default::default()).await?;

        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (will create and edit a pod)
    async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
        use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};

        let client = Client::try_default().await?;
        let pods: Api<Pod> = Api::default_namespaced(client);

        // create busybox pod that's alive for at most 30s
        let p: Pod = serde_json::from_value(json!({
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": "busybox-kube1",
                "labels": { "app": "kube-rs-test" },
            },
            "spec": {
                "terminationGracePeriodSeconds": 1,
                "restartPolicy": "Never",
                "containers": [{
                  "name": "busybox",
                  "image": "busybox:1.34.1",
                  "command": ["sh", "-c", "sleep 30"],
                }],
            }
        }))?;

        let pp = PostParams::default();
        match pods.create(&pp, &p).await {
            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
            Err(e) => return Err(e.into()),                         // any other case if a failure
        }

        // Manual watch-api for it to become ready
        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
        let lp = ListParams::default()
            .fields(&format!("metadata.name={}", "busybox-kube1"))
            .timeout(15);
        let mut stream = pods.watch(&lp, "0").await?.boxed();
        while let Some(ev) = stream.try_next().await? {
            // can debug format watch event
            let _ = format!("we: {ev:?}");
            match ev {
                WatchEvent::Modified(o) => {
                    let s = o.status.as_ref().expect("status exists on pod");
                    let phase = s.phase.clone().unwrap_or_default();
                    if phase == "Running" {
                        break;
                    }
                }
                WatchEvent::Error(e) => panic!("watch error: {e}"),
                _ => {}
            }
        }

        // Verify we can get it
        let mut pod = pods.get("busybox-kube1").await?;
        assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");

        // verify replace with explicit resource version
        // NB: don't do this; use server side apply
        {
            assert!(pod.resource_version().is_some());
            pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);

            let pp = PostParams::default();
            let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
            assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
        }

        // Delete it
        let dp = DeleteParams::default();
        pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
            assert_eq!(pdel.name_unchecked(), "busybox-kube1");
        });

        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (will create and attach to a pod)
    #[cfg(all(feature = "ws"))]
    async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
        use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};

        let client = Client::try_default().await?;
        let pods: Api<Pod> = Api::default_namespaced(client);

        // create busybox pod that's alive for at most 30s
        let p: Pod = serde_json::from_value(json!({
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": "busybox-kube2",
                "labels": { "app": "kube-rs-test" },
            },
            "spec": {
                "terminationGracePeriodSeconds": 1,
                "restartPolicy": "Never",
                "containers": [{
                  "name": "busybox",
                  "image": "busybox:1.34.1",
                  "command": ["sh", "-c", "sleep 30"],
                }],
            }
        }))?;

        match pods.create(&Default::default(), &p).await {
            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
            Err(e) => return Err(e.into()),                         // any other case if a failure
        }

        // Manual watch-api for it to become ready
        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
        let lp = ListParams::default()
            .fields(&format!("metadata.name={}", "busybox-kube2"))
            .timeout(15);
        let mut stream = pods.watch(&lp, "0").await?.boxed();
        while let Some(ev) = stream.try_next().await? {
            match ev {
                WatchEvent::Modified(o) => {
                    let s = o.status.as_ref().expect("status exists on pod");
                    let phase = s.phase.clone().unwrap_or_default();
                    if phase == "Running" {
                        break;
                    }
                }
                WatchEvent::Error(e) => panic!("watch error: {e}"),
                _ => {}
            }
        }

        // Verify exec works and we can get the output
        {
            let mut attached = pods
                .exec(
                    "busybox-kube2",
                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
                    &AttachParams::default().stderr(false),
                )
                .await?;
            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
            let out = stdout
                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
                .collect::<Vec<_>>()
                .await
                .join("");
            attached.join().await.unwrap();
            assert_eq!(out.lines().count(), 3);
            assert_eq!(out, "1\n2\n3\n");
        }

        // Verify we can write to Stdin
        {
            use tokio::io::AsyncWriteExt;
            let mut attached = pods
                .exec(
                    "busybox-kube2",
                    vec!["sh"],
                    &AttachParams::default().stdin(true).stderr(false),
                )
                .await?;
            let mut stdin_writer = attached.stdin().unwrap();
            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
            let next_stdout = stdout_stream.next();
            stdin_writer.write_all(b"echo test string 1\n").await?;
            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
            println!("{stdout}");
            assert_eq!(stdout, "test string 1\n");

            // AttachedProcess resolves with status object.
            // Send `exit 1` to get a failure status.
            stdin_writer.write_all(b"exit 1\n").await?;
            let status = attached.take_status().unwrap();
            if let Some(status) = status.await {
                println!("{status:?}");
                assert_eq!(status.status, Some("Failure".to_owned()));
                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
            }
        }

        // Delete it
        let dp = DeleteParams::default();
        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
        });

        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (will create and tail logs from a pod)
    async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
        use crate::{
            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
            core::subresource::LogParams,
        };

        let client = Client::try_default().await?;
        let pods: Api<Pod> = Api::default_namespaced(client);

        // create busybox pod that's alive for at most 30s
        let p: Pod = serde_json::from_value(json!({
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": "busybox-kube3",
                "labels": { "app": "kube-rs-test" },
            },
            "spec": {
                "terminationGracePeriodSeconds": 1,
                "restartPolicy": "Never",
                "containers": [{
                  "name": "busybox",
                  "image": "busybox:1.34.1",
                  "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
                }],
            }
        }))?;

        match pods.create(&Default::default(), &p).await {
            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
            Err(e) => return Err(e.into()),                         // any other case if a failure
        }

        // Manual watch-api for it to become ready
        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
        let lp = ListParams::default()
            .fields(&format!("metadata.name={}", "busybox-kube3"))
            .timeout(15);
        let mut stream = pods.watch(&lp, "0").await?.boxed();
        while let Some(ev) = stream.try_next().await? {
            match ev {
                WatchEvent::Modified(o) => {
                    let s = o.status.as_ref().expect("status exists on pod");
                    let phase = s.phase.clone().unwrap_or_default();
                    if phase == "Running" {
                        break;
                    }
                }
                WatchEvent::Error(e) => panic!("watch error: {e}"),
                _ => {}
            }
        }

        // Get current list of logs
        let lp = LogParams {
            follow: true,
            ..LogParams::default()
        };
        let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.boxed();
        let log_line = logs_stream.try_next().await?.unwrap();
        assert_eq!(log_line, "kube 1\n");

        // wait for container to finish
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;

        let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
        assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");

        // remaining logs should have been buffered internally
        assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 2\n");
        assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 3\n");
        assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 4\n");
        assert_eq!(logs_stream.try_next().await?.unwrap(), "kube 5\n");

        // evict the pod
        let ep = EvictParams::default();
        let eres = pods.evict("busybox-kube3", &ep).await?;
        assert_eq!(eres.code, 201); // created
        assert!(eres.is_success());

        Ok(())
    }

    #[tokio::test]
    #[ignore] // needs cluster (will create a CertificateSigningRequest)
    async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
        use crate::api::PostParams;
        use k8s_openapi::api::certificates::v1::{
            CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
        };

        let csr_name = "fake";
        let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
            "apiVersion": "certificates.k8s.io/v1",
            "kind": "CertificateSigningRequest",
            "metadata": { "name": csr_name },
            "spec": {
                "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
                "signerName": "kubernetes.io/kube-apiserver-client",
                "expirationSeconds": 86400,
                "usages": ["client auth"]
            }
        }))?;

        let client = Client::try_default().await?;
        let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
        assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());

        // Patch the approval and approve the CSR
        let approval_type = "ApprovedFake";
        let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
            certificate: None,
            conditions: Some(vec![CertificateSigningRequestCondition {
                type_: approval_type.to_string(),
                last_update_time: None,
                last_transition_time: None,
                message: Some(format!("{} {}", approval_type, "by kube-rs client")),
                reason: Some("kube-rsClient".to_string()),
                status: "True".to_string(),
            }]),
        };
        let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
        let _ = csr
            .patch_approval(csr_name, &Default::default(), &csr_status_patch)
            .await?;
        let csr_after_approval = csr.get_approval(csr_name).await?;

        assert_eq!(
            csr_after_approval
                .status
                .as_ref()
                .unwrap()
                .conditions
                .as_ref()
                .unwrap()[0]
                .type_,
            approval_type.to_string()
        );
        csr.delete(csr_name, &DeleteParams::default()).await?;
        Ok(())
    }
}