pipedash-plugin-tekton 0.1.1

Tekton CD plugin for Pipedash
Documentation
use std::path::PathBuf;

use kube::Client;
use pipedash_plugin_api::{
    PluginError,
    PluginResult,
    RetryPolicy,
};

use crate::{
    config,
    types::{
        PipelineList,
        PipelineRunList,
        TektonPipeline,
        TektonPipelineRun,
    },
};

pub struct TektonClient {
    client: Client,
    retry_policy: RetryPolicy,
}

impl TektonClient {
    fn merge_kubeconfigs(paths: Vec<String>) -> PluginResult<kube::config::Kubeconfig> {
        let mut merged = kube::config::Kubeconfig {
            preferences: None,
            clusters: vec![],
            auth_infos: vec![],
            contexts: vec![],
            current_context: None,
            extensions: None,
            kind: None,
            api_version: None,
        };

        let mut found_any = false;

        for path_str in paths {
            let path = PathBuf::from(&path_str);
            if !path.exists() {
                continue;
            }

            match kube::config::Kubeconfig::read_from(&path) {
                Ok(kc) => {
                    found_any = true;
                    merged.clusters.extend(kc.clusters);
                    merged.auth_infos.extend(kc.auth_infos);
                    merged.contexts.extend(kc.contexts);
                    if merged.current_context.is_none() {
                        merged.current_context = kc.current_context;
                    }
                }
                Err(_) => continue,
            }
        }

        if !found_any {
            return Err(PluginError::InvalidConfig(
                "No valid kubeconfig files found".to_string(),
            ));
        }

        Ok(merged)
    }

    pub async fn from_kubeconfig(
        kubeconfig_path: Option<&str>, context: Option<&str>,
    ) -> PluginResult<Self> {
        let kubeconfig = if let Some(path_str) = kubeconfig_path {
            let paths = config::split_kubeconfig_paths(path_str);
            Self::merge_kubeconfigs(paths)?
        } else {
            let default_path = config::get_default_kubeconfig_path();
            let paths = config::split_kubeconfig_paths(&default_path);
            Self::merge_kubeconfigs(paths)?
        };

        let options = if let Some(ctx) = context {
            kube::config::KubeConfigOptions {
                context: Some(ctx.to_string()),
                ..Default::default()
            }
        } else {
            kube::config::KubeConfigOptions::default()
        };

        let config = kube::Config::from_custom_kubeconfig(kubeconfig, &options)
            .await
            .map_err(|e| PluginError::InvalidConfig(format!("Failed to load kubeconfig: {}", e)))?;

        let client = Client::try_from(config).map_err(|e| {
            PluginError::InvalidConfig(format!("Failed to create Kubernetes client: {}", e))
        })?;

        Ok(Self {
            client,
            retry_policy: RetryPolicy::default(),
        })
    }

    pub async fn list_namespaces(&self) -> PluginResult<Vec<String>> {
        self.retry_policy
            .retry(|| async {
                use kube::api::{
                    Api,
                    ListParams,
                };

                let namespaces_api: Api<k8s_openapi::api::core::v1::Namespace> =
                    Api::all(self.client.clone());

                let namespaces =
                    namespaces_api
                        .list(&ListParams::default())
                        .await
                        .map_err(|e| {
                            PluginError::ApiError(format!("Failed to list namespaces: {}", e))
                        })?;

                Ok(namespaces
                    .items
                    .into_iter()
                    .filter_map(|ns| ns.metadata.name)
                    .collect())
            })
            .await
    }

    pub async fn try_list_namespaces_cluster_wide(&self) -> PluginResult<Vec<String>> {
        use kube::api::{
            Api,
            ListParams,
        };

        let namespaces_api: Api<k8s_openapi::api::core::v1::Namespace> =
            Api::all(self.client.clone());

        match namespaces_api.list(&ListParams::default()).await {
            Ok(namespaces) => Ok(namespaces
                .items
                .into_iter()
                .filter_map(|ns| ns.metadata.name)
                .collect()),
            Err(e) => {
                if let kube::Error::Api(api_error) = &e {
                    if api_error.code == 403 {
                        return Err(PluginError::ApiError(
                            "Missing cluster-wide namespace permissions. Please use 'custom' mode and specify namespaces manually in the configuration.".to_string()
                        ));
                    }
                }
                Err(PluginError::ApiError(format!(
                    "Failed to list namespaces: {}",
                    e
                )))
            }
        }
    }

    async fn filter_namespaces_with_pipelines(&self, namespaces: &[String]) -> Vec<String> {
        use futures::future::join_all;

        let check_futures = namespaces.iter().map(|namespace| {
            let ns = namespace.clone();
            async move {
                if let Ok(pipelines) = self.list_pipelines(&ns).await {
                    if !pipelines.is_empty() || self.has_pipelines(&ns).await {
                        return Some(ns);
                    }
                }
                None
            }
        });

        join_all(check_futures)
            .await
            .into_iter()
            .flatten()
            .collect()
    }

    pub async fn list_namespaces_with_pipelines(&self) -> PluginResult<Vec<String>> {
        let all_namespaces = self.list_namespaces().await?;
        Ok(self.filter_namespaces_with_pipelines(&all_namespaces).await)
    }

    pub async fn validate_namespaces_have_pipelines(
        &self, namespaces: &[String],
    ) -> PluginResult<Vec<String>> {
        use futures::future::join_all;

        if namespaces.is_empty() {
            return Err(PluginError::InvalidConfig(
                "No namespaces specified. Please provide at least one namespace in the 'namespaces' field.".to_string()
            ));
        }

        let validation_futures = namespaces.iter().map(|namespace| {
            let ns = namespace.clone();
            async move {
                match self.list_pipelines(&ns).await {
                    Ok(pipelines) => {
                        if !pipelines.is_empty() {
                            Ok(Some(ns))
                        } else {
                            Ok(None)
                        }
                    }
                    Err(e) => Err(format!("Failed to access namespace '{}': {}", ns, e)),
                }
            }
        });

        let results = join_all(validation_futures).await;

        let mut valid_namespaces = Vec::new();
        let mut errors = Vec::new();

        for result in results {
            match result {
                Ok(Some(ns)) => valid_namespaces.push(ns),
                Ok(None) => {}
                Err(e) => errors.push(e),
            }
        }

        if valid_namespaces.is_empty() {
            if errors.is_empty() {
                return Err(PluginError::InvalidConfig(
                    format!("No Tekton pipelines found in any of the specified namespaces: {:?}. Verify that Tekton is installed and pipelines exist in these namespaces.", namespaces)
                ));
            } else {
                return Err(PluginError::InvalidConfig(format!(
                    "Failed to validate namespaces. Errors: {}",
                    errors.join("; ")
                )));
            }
        }

        Ok(valid_namespaces)
    }

    async fn has_pipelines(&self, namespace: &str) -> bool {
        self.list_pipelines(namespace).await.is_ok()
    }

    async fn request_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> PluginResult<T> {
        let url = url.to_string();

        self.retry_policy
            .retry(|| async {
                let request = http::Request::builder()
                    .uri(&url)
                    .method(http::Method::GET)
                    .body(Vec::new())
                    .map_err(|e| {
                        PluginError::Internal(format!("Failed to build request: {}", e))
                    })?;

                let response_body =
                    self.client.request_text(request).await.map_err(|e| {
                        PluginError::ApiError(format!("Failed to make request: {}", e))
                    })?;

                serde_json::from_str(&response_body).map_err(|e| {
                    PluginError::SerializationError(format!("Failed to parse response: {}", e))
                })
            })
            .await
    }

    async fn post_json<T: serde::de::DeserializeOwned>(
        &self, url: &str, body: &serde_json::Value,
    ) -> PluginResult<T> {
        let url = url.to_string();
        let body = body.clone();

        self.retry_policy
            .retry(|| async {
                let body_bytes = serde_json::to_vec(&body).map_err(|e| {
                    PluginError::SerializationError(format!("Failed to serialize request: {}", e))
                })?;

                let request = http::Request::builder()
                    .uri(&url)
                    .method(http::Method::POST)
                    .header("Content-Type", "application/json")
                    .body(body_bytes)
                    .map_err(|e| {
                        PluginError::Internal(format!("Failed to build request: {}", e))
                    })?;

                let response_body =
                    self.client.request_text(request).await.map_err(|e| {
                        PluginError::ApiError(format!("Failed to make request: {}", e))
                    })?;

                serde_json::from_str(&response_body).map_err(|e| {
                    PluginError::SerializationError(format!("Failed to parse response: {}", e))
                })
            })
            .await
    }

    pub async fn list_pipelines(&self, namespace: &str) -> PluginResult<Vec<TektonPipeline>> {
        let url = format!("/apis/tekton.dev/v1/namespaces/{}/pipelines", namespace);
        let pipeline_list: PipelineList = self.request_json(&url).await?;
        Ok(pipeline_list.items)
    }

    pub async fn get_pipeline(&self, namespace: &str, name: &str) -> PluginResult<TektonPipeline> {
        let url = format!(
            "/apis/tekton.dev/v1/namespaces/{}/pipelines/{}",
            namespace, name
        );
        self.request_json(&url).await
    }

    pub async fn list_pipelineruns(
        &self, namespace: &str, pipeline_name: Option<&str>,
    ) -> PluginResult<Vec<TektonPipelineRun>> {
        let url = if let Some(pipeline) = pipeline_name {
            format!(
                "/apis/tekton.dev/v1/namespaces/{}/pipelineruns?labelSelector=tekton.dev/pipeline={}",
                namespace, pipeline
            )
        } else {
            format!("/apis/tekton.dev/v1/namespaces/{}/pipelineruns", namespace)
        };

        let pipelinerun_list: PipelineRunList = self.request_json(&url).await?;
        Ok(pipelinerun_list.items)
    }

    pub async fn create_pipelinerun(
        &self, namespace: &str, pipelinerun: &TektonPipelineRun,
    ) -> PluginResult<TektonPipelineRun> {
        let url = format!("/apis/tekton.dev/v1/namespaces/{}/pipelineruns", namespace);
        let body = serde_json::to_value(pipelinerun).map_err(|e| {
            PluginError::SerializationError(format!("Failed to serialize pipelinerun: {}", e))
        })?;
        self.post_json(&url, &body).await
    }

    pub async fn delete_pipelinerun(&self, namespace: &str, name: &str) -> PluginResult<()> {
        let namespace = namespace.to_string();
        let name = name.to_string();

        self.retry_policy
            .retry(|| async {
                let request = http::Request::builder()
                    .uri(format!(
                        "/apis/tekton.dev/v1/namespaces/{}/pipelineruns/{}",
                        namespace, name
                    ))
                    .method(http::Method::DELETE)
                    .body(Vec::new())
                    .map_err(|e| {
                        PluginError::Internal(format!("Failed to build request: {}", e))
                    })?;

                self.client.request_text(request).await.map_err(|e| {
                    PluginError::ApiError(format!("Failed to delete pipelinerun: {}", e))
                })?;

                Ok(())
            })
            .await
    }
}