pipedash_plugin_tekton/
plugin.rs

1use std::collections::HashMap;
2use std::sync::OnceLock;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    mapper,
12    metadata,
13    types,
14};
15
16pub struct TektonPlugin {
17    metadata: PluginMetadata,
18    client: OnceLock<client::TektonClient>,
19    provider_id: Option<i64>,
20    config: HashMap<String, String>,
21}
22
23impl Default for TektonPlugin {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl TektonPlugin {
30    pub fn new() -> Self {
31        Self {
32            metadata: metadata::create_metadata(),
33            client: OnceLock::new(),
34            provider_id: None,
35            config: HashMap::new(),
36        }
37    }
38
39    async fn client(&self) -> PluginResult<&client::TektonClient> {
40        if let Some(client) = self.client.get() {
41            return Ok(client);
42        }
43
44        let kubeconfig_path = config::get_kubeconfig_path(&self.config);
45        let context = config::get_context(&self.config);
46
47        let new_client =
48            client::TektonClient::from_kubeconfig(kubeconfig_path.as_deref(), context.as_deref())
49                .await?;
50
51        Ok(self.client.get_or_init(|| new_client))
52    }
53
54    async fn fetch_all_pipelines_in_namespaces(&self) -> PluginResult<Vec<types::TektonPipeline>> {
55        let client = self.client().await?;
56
57        let selected_ids = config::get_selected_pipelines(&self.config);
58
59        let namespaces = if selected_ids.is_empty() {
60            let namespace_mode = config::get_namespace_mode(&self.config);
61
62            match namespace_mode {
63                config::NamespaceMode::Custom => config::get_namespaces(&self.config),
64                config::NamespaceMode::All => client.list_namespaces_with_pipelines().await?,
65            }
66        } else {
67            let unique_namespaces: std::collections::HashSet<String> = selected_ids
68                .iter()
69                .filter_map(|id| {
70                    config::parse_pipeline_id(id)
71                        .ok()
72                        .map(|(_provider_id, namespace, _pipeline_name)| namespace)
73                })
74                .collect();
75            unique_namespaces.into_iter().collect()
76        };
77
78        let pipeline_futures = namespaces
79            .iter()
80            .map(|namespace| async move { client.list_pipelines(namespace).await.ok() });
81
82        let results: Vec<Option<Vec<types::TektonPipeline>>> = join_all(pipeline_futures).await;
83
84        let all_pipelines: Vec<types::TektonPipeline> =
85            results.into_iter().flatten().flatten().collect();
86
87        if selected_ids.is_empty() {
88            Ok(all_pipelines)
89        } else {
90            Ok(all_pipelines
91                .into_iter()
92                .filter(|p| {
93                    let id = format!("{}__{}", p.metadata.namespace, p.metadata.name);
94                    selected_ids.contains(&id)
95                })
96                .collect())
97        }
98    }
99
100    async fn fetch_latest_run_for_pipeline(
101        &self, namespace: &str, pipeline_name: &str,
102    ) -> Option<types::TektonPipelineRun> {
103        let client = self.client().await.ok()?;
104        let mut runs = client
105            .list_pipelineruns(namespace, Some(pipeline_name))
106            .await
107            .ok()?;
108
109        runs.sort_by(|a, b| {
110            let a_time = types::parse_timestamp(&a.metadata.creation_timestamp);
111            let b_time = types::parse_timestamp(&b.metadata.creation_timestamp);
112            b_time.cmp(&a_time)
113        });
114
115        runs.into_iter().next()
116    }
117
118    fn get_available_contexts(&self, kubeconfig_path: Option<&str>) -> PluginResult<Vec<String>> {
119        use std::collections::HashSet;
120        use std::path::PathBuf;
121
122        let paths = if let Some(path_str) = kubeconfig_path {
123            config::split_kubeconfig_paths(path_str)
124        } else {
125            let default_path = config::get_default_kubeconfig_path();
126            config::split_kubeconfig_paths(&default_path)
127        };
128
129        let mut all_contexts = HashSet::new();
130
131        for path_str in paths {
132            let path = PathBuf::from(&path_str);
133            if !path.exists() {
134                continue;
135            }
136
137            match kube::config::Kubeconfig::read_from(&path) {
138                Ok(kubeconfig) => {
139                    for context in kubeconfig.contexts {
140                        all_contexts.insert(context.name);
141                    }
142                }
143                Err(_) => continue,
144            }
145        }
146
147        if all_contexts.is_empty() {
148            return Err(PluginError::InvalidConfig(
149                "No valid kubeconfig files found or no contexts available".to_string(),
150            ));
151        }
152
153        let mut contexts: Vec<String> = all_contexts.into_iter().collect();
154        contexts.sort();
155        Ok(contexts)
156    }
157}
158
159#[async_trait]
160impl Plugin for TektonPlugin {
161    fn metadata(&self) -> &PluginMetadata {
162        &self.metadata
163    }
164
165    fn provider_type(&self) -> &str {
166        "tekton"
167    }
168
169    fn initialize(
170        &mut self, provider_id: i64, config: HashMap<String, String>,
171        _http_client: Option<std::sync::Arc<reqwest::Client>>,
172    ) -> PluginResult<()> {
173        self.provider_id = Some(provider_id);
174        self.config = config;
175        Ok(())
176    }
177
178    async fn validate_credentials(&self) -> PluginResult<bool> {
179        let client = self.client().await?;
180        let namespace_mode = config::get_namespace_mode(&self.config);
181
182        let namespaces = match namespace_mode {
183            config::NamespaceMode::Custom => {
184                let manual_namespaces = config::get_namespaces(&self.config);
185
186                if manual_namespaces.is_empty() {
187                    return Err(PluginError::InvalidConfig(
188                        "Namespace mode is set to 'custom' but no namespaces are specified. Please provide at least one namespace in the 'namespaces' field (e.g., 'default,tekton-pipelines').".to_string(),
189                    ));
190                }
191
192                client
193                    .validate_namespaces_have_pipelines(&manual_namespaces)
194                    .await?
195            }
196            config::NamespaceMode::All => match client.try_list_namespaces_cluster_wide().await {
197                Ok(all_namespaces) => {
198                    if all_namespaces.is_empty() {
199                        return Err(PluginError::InvalidConfig(
200                                "No namespaces found in the cluster. Please verify your cluster connection and permissions.".to_string(),
201                            ));
202                    }
203                    client.list_namespaces_with_pipelines().await?
204                }
205                Err(e) => return Err(e),
206            },
207        };
208
209        if namespaces.is_empty() {
210            let hint = match namespace_mode {
211                config::NamespaceMode::Custom => "Verify that the specified namespaces exist and contain Tekton pipelines, and that you have permissions to access them.",
212                config::NamespaceMode::All => "Try switching to 'custom' namespace mode and manually specify the namespaces containing your Tekton pipelines.",
213            };
214
215            return Err(PluginError::InvalidConfig(format!(
216                "No Tekton pipelines found in any accessible namespace. {}",
217                hint
218            )));
219        }
220
221        Ok(true)
222    }
223
224    async fn fetch_available_pipelines(
225        &self, params: Option<PaginationParams>,
226    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
227        let params = params.unwrap_or_default();
228        let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
229        let all_pipelines: Vec<_> = pipelines
230            .iter()
231            .map(mapper::map_available_pipeline)
232            .collect();
233
234        let total_count = all_pipelines.len();
235        let start = ((params.page - 1) * params.page_size).min(total_count);
236        let end = (start + params.page_size).min(total_count);
237        let items = all_pipelines[start..end].to_vec();
238
239        Ok(PaginatedResponse::new(
240            items,
241            params.page,
242            params.page_size,
243            total_count,
244        ))
245    }
246
247    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
248        let provider_id = self
249            .provider_id
250            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
251
252        let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
253
254        let pipeline_futures = pipelines.iter().map(|pipeline| async move {
255            let latest_run = self
256                .fetch_latest_run_for_pipeline(
257                    &pipeline.metadata.namespace,
258                    &pipeline.metadata.name,
259                )
260                .await;
261            mapper::map_pipeline(pipeline, latest_run.as_ref(), provider_id)
262        });
263
264        let results = join_all(pipeline_futures).await;
265        Ok(results)
266    }
267
268    async fn fetch_run_history(
269        &self, pipeline_id: &str, limit: usize,
270    ) -> PluginResult<Vec<PipelineRun>> {
271        let (provider_id, namespace, pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
272        let client = self.client().await?;
273
274        let mut runs = client
275            .list_pipelineruns(&namespace, Some(&pipeline_name))
276            .await?;
277
278        runs.sort_by(|a, b| {
279            let a_time = types::parse_timestamp(&a.metadata.creation_timestamp);
280            let b_time = types::parse_timestamp(&b.metadata.creation_timestamp);
281            b_time.cmp(&a_time)
282        });
283
284        let limited_runs: Vec<types::TektonPipelineRun> = runs.into_iter().take(limit).collect();
285
286        Ok(limited_runs
287            .iter()
288            .map(|run| mapper::map_pipeline_run(run, provider_id))
289            .collect())
290    }
291
292    async fn fetch_run_details(
293        &self, pipeline_id: &str, run_number: i64,
294    ) -> PluginResult<PipelineRun> {
295        let (provider_id, namespace, _pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
296        let client = self.client().await?;
297
298        let runs = client.list_pipelineruns(&namespace, None).await?;
299
300        let run = runs
301            .into_iter()
302            .find(|r| {
303                types::parse_timestamp(&r.metadata.creation_timestamp).map(|dt| dt.timestamp())
304                    == Some(run_number)
305            })
306            .ok_or_else(|| {
307                PluginError::PipelineNotFound(format!(
308                    "PipelineRun with timestamp {} not found",
309                    run_number
310                ))
311            })?;
312
313        Ok(mapper::map_pipeline_run(&run, provider_id))
314    }
315
316    async fn fetch_workflow_parameters(
317        &self, workflow_id: &str,
318    ) -> PluginResult<Vec<WorkflowParameter>> {
319        let (_provider_id, namespace, pipeline_name) = config::parse_pipeline_id(workflow_id)?;
320        let client = self.client().await?;
321
322        let pipeline = client.get_pipeline(&namespace, &pipeline_name).await?;
323
324        Ok(mapper::map_workflow_parameters(&pipeline))
325    }
326
327    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
328        let (_provider_id, namespace, pipeline_name) =
329            config::parse_pipeline_id(&params.workflow_id)?;
330
331        let client = self.client().await?;
332
333        let pipeline = client.get_pipeline(&namespace, &pipeline_name).await?;
334
335        let param_values: Vec<types::ParamValue> = if let Some(inputs) = &params.inputs {
336            inputs
337                .as_object()
338                .map(|obj| {
339                    obj.iter()
340                        .map(|(key, value)| types::ParamValue {
341                            name: key.clone(),
342                            value: value.clone(),
343                        })
344                        .collect()
345                })
346                .unwrap_or_default()
347        } else {
348            vec![]
349        };
350
351        let workspaces: Vec<types::WorkspaceBinding> = pipeline
352            .spec
353            .workspaces
354            .iter()
355            .filter_map(|ws| {
356                if ws.optional.unwrap_or(false) {
357                    None
358                } else {
359                    Some(types::WorkspaceBinding {
360                        name: ws.name.clone(),
361                        empty_dir: Some(serde_json::json!({})),
362                        persistent_volume_claim: None,
363                        config_map: None,
364                        secret: None,
365                    })
366                }
367            })
368            .collect();
369
370        let run_name = format!("{}-{}", pipeline_name, chrono::Utc::now().timestamp());
371
372        let mut annotations = HashMap::new();
373        annotations.insert("tekton.dev/triggeredBy".to_string(), "pipedash".to_string());
374
375        let pipelinerun = types::TektonPipelineRun {
376            api_version: "tekton.dev/v1".to_string(),
377            kind: "PipelineRun".to_string(),
378            metadata: types::ObjectMeta {
379                name: run_name.clone(),
380                namespace: namespace.clone(),
381                creation_timestamp: None,
382                labels: HashMap::new(),
383                annotations,
384            },
385            spec: types::PipelineRunSpec {
386                pipeline_ref: Some(types::PipelineRef {
387                    name: pipeline_name.clone(),
388                }),
389                params: param_values,
390                workspaces,
391                timeout: None,
392                task_run_template: None,
393            },
394            status: types::PipelineRunStatus {
395                conditions: vec![],
396                start_time: None,
397                completion_time: None,
398                task_runs: HashMap::new(),
399                child_references: vec![],
400            },
401        };
402
403        let created_run = client.create_pipelinerun(&namespace, &pipelinerun).await?;
404
405        Ok(format!(
406            "PipelineRun created: {}/{}",
407            namespace, created_run.metadata.name
408        ))
409    }
410
411    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
412        let (_provider_id, namespace, _pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
413        let client = self.client().await?;
414
415        let runs = client.list_pipelineruns(&namespace, None).await?;
416
417        let matching_runs: Vec<_> = runs
418            .into_iter()
419            .filter(|r| {
420                types::parse_timestamp(&r.metadata.creation_timestamp).map(|dt| dt.timestamp())
421                    == Some(run_number)
422            })
423            .collect();
424
425        if matching_runs.is_empty() {
426            return Err(PluginError::PipelineNotFound(format!(
427                "PipelineRun with timestamp {} not found",
428                run_number
429            )));
430        }
431
432        if matching_runs.len() > 1 {
433            tracing::warn!(
434                run_number = run_number,
435                run_name = %matching_runs[0].metadata.name,
436                count = matching_runs.len(),
437                "Multiple PipelineRuns found with same timestamp, cancelling first one"
438            );
439        }
440
441        let run = &matching_runs[0];
442
443        client
444            .delete_pipelinerun(&namespace, &run.metadata.name)
445            .await?;
446
447        Ok(())
448    }
449
450    async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
451        Ok(vec![Organization {
452            id: "default".to_string(),
453            name: "All Namespaces".to_string(),
454            description: Some("All accessible Kubernetes namespaces".to_string()),
455        }])
456    }
457
458    async fn fetch_available_pipelines_filtered(
459        &self, _org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
460    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
461        let params = params.unwrap_or_default();
462        let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
463        let mut all_pipelines: Vec<_> = pipelines
464            .iter()
465            .map(mapper::map_available_pipeline)
466            .collect();
467
468        if let Some(search_term) = search {
469            let search_lower = search_term.to_lowercase();
470            all_pipelines.retain(|p| {
471                p.name.to_lowercase().contains(&search_lower)
472                    || p.id.to_lowercase().contains(&search_lower)
473                    || p.description
474                        .as_ref()
475                        .is_some_and(|d| d.to_lowercase().contains(&search_lower))
476            });
477        }
478
479        let total_count = all_pipelines.len();
480        let start = ((params.page - 1) * params.page_size).min(total_count);
481        let end = (start + params.page_size).min(total_count);
482        let items = all_pipelines[start..end].to_vec();
483
484        Ok(PaginatedResponse::new(
485            items,
486            params.page,
487            params.page_size,
488            total_count,
489        ))
490    }
491
492    async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
493        Err(PluginError::NotSupported(
494            "Build agents not supported by Tekton plugin".to_string(),
495        ))
496    }
497
498    async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
499        Err(PluginError::NotSupported(
500            "Artifacts not implemented for Tekton plugin".to_string(),
501        ))
502    }
503
504    async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
505        Err(PluginError::NotSupported(
506            "Build queues not supported by Tekton plugin".to_string(),
507        ))
508    }
509
510    fn get_migrations(&self) -> Vec<String> {
511        vec![]
512    }
513
514    async fn get_field_options(
515        &self, field_key: &str, config: &HashMap<String, String>,
516    ) -> PluginResult<Vec<String>> {
517        if field_key == "context" {
518            let kubeconfig_path = config::get_kubeconfig_path(config);
519            let contexts = self.get_available_contexts(kubeconfig_path.as_deref())?;
520            Ok(contexts)
521        } else if field_key == "namespaces" {
522            let kubeconfig_path = config::get_kubeconfig_path(config);
523            let context = config::get_context(config);
524
525            match client::TektonClient::from_kubeconfig(
526                kubeconfig_path.as_deref(),
527                context.as_deref(),
528            )
529            .await
530            {
531                Ok(temp_client) => match temp_client.try_list_namespaces_cluster_wide().await {
532                    Ok(namespaces) => Ok(namespaces),
533                    Err(e) => {
534                        tracing::warn!(error = %e, "Failed to fetch namespaces for Tekton autocomplete");
535                        Ok(Vec::new())
536                    }
537                },
538                Err(e) => {
539                    tracing::warn!(error = %e, "Failed to create Tekton client for namespace autocomplete");
540                    Ok(Vec::new())
541                }
542            }
543        } else {
544            Ok(Vec::new())
545        }
546    }
547}