1use std::collections::HashMap;
2use std::sync::OnceLock;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9 client,
10 config,
11 mapper,
12 metadata,
13 types,
14};
15
16pub struct TektonPlugin {
17 metadata: PluginMetadata,
18 client: OnceLock<client::TektonClient>,
19 provider_id: Option<i64>,
20 config: HashMap<String, String>,
21}
22
23impl Default for TektonPlugin {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl TektonPlugin {
30 pub fn new() -> Self {
31 Self {
32 metadata: metadata::create_metadata(),
33 client: OnceLock::new(),
34 provider_id: None,
35 config: HashMap::new(),
36 }
37 }
38
39 async fn client(&self) -> PluginResult<&client::TektonClient> {
40 if let Some(client) = self.client.get() {
41 return Ok(client);
42 }
43
44 let kubeconfig_path = config::get_kubeconfig_path(&self.config);
45 let context = config::get_context(&self.config);
46
47 let new_client =
48 client::TektonClient::from_kubeconfig(kubeconfig_path.as_deref(), context.as_deref())
49 .await?;
50
51 Ok(self.client.get_or_init(|| new_client))
52 }
53
54 async fn fetch_all_pipelines_in_namespaces(&self) -> PluginResult<Vec<types::TektonPipeline>> {
55 let client = self.client().await?;
56
57 let selected_ids = config::get_selected_pipelines(&self.config);
58
59 let namespaces = if selected_ids.is_empty() {
60 let namespace_mode = config::get_namespace_mode(&self.config);
61
62 match namespace_mode {
63 config::NamespaceMode::Custom => config::get_namespaces(&self.config),
64 config::NamespaceMode::All => client.list_namespaces_with_pipelines().await?,
65 }
66 } else {
67 let unique_namespaces: std::collections::HashSet<String> = selected_ids
68 .iter()
69 .filter_map(|id| {
70 config::parse_pipeline_id(id)
71 .ok()
72 .map(|(_provider_id, namespace, _pipeline_name)| namespace)
73 })
74 .collect();
75 unique_namespaces.into_iter().collect()
76 };
77
78 let pipeline_futures = namespaces
79 .iter()
80 .map(|namespace| async move { client.list_pipelines(namespace).await.ok() });
81
82 let results: Vec<Option<Vec<types::TektonPipeline>>> = join_all(pipeline_futures).await;
83
84 let all_pipelines: Vec<types::TektonPipeline> =
85 results.into_iter().flatten().flatten().collect();
86
87 if selected_ids.is_empty() {
88 Ok(all_pipelines)
89 } else {
90 Ok(all_pipelines
91 .into_iter()
92 .filter(|p| {
93 let id = format!("{}__{}", p.metadata.namespace, p.metadata.name);
94 selected_ids.contains(&id)
95 })
96 .collect())
97 }
98 }
99
100 async fn fetch_latest_run_for_pipeline(
101 &self, namespace: &str, pipeline_name: &str,
102 ) -> Option<types::TektonPipelineRun> {
103 let client = self.client().await.ok()?;
104 let mut runs = client
105 .list_pipelineruns(namespace, Some(pipeline_name))
106 .await
107 .ok()?;
108
109 runs.sort_by(|a, b| {
110 let a_time = types::parse_timestamp(&a.metadata.creation_timestamp);
111 let b_time = types::parse_timestamp(&b.metadata.creation_timestamp);
112 b_time.cmp(&a_time)
113 });
114
115 runs.into_iter().next()
116 }
117
118 fn get_available_contexts(&self, kubeconfig_path: Option<&str>) -> PluginResult<Vec<String>> {
119 use std::collections::HashSet;
120 use std::path::PathBuf;
121
122 let paths = if let Some(path_str) = kubeconfig_path {
123 config::split_kubeconfig_paths(path_str)
124 } else {
125 let default_path = config::get_default_kubeconfig_path();
126 config::split_kubeconfig_paths(&default_path)
127 };
128
129 let mut all_contexts = HashSet::new();
130
131 for path_str in paths {
132 let path = PathBuf::from(&path_str);
133 if !path.exists() {
134 continue;
135 }
136
137 match kube::config::Kubeconfig::read_from(&path) {
138 Ok(kubeconfig) => {
139 for context in kubeconfig.contexts {
140 all_contexts.insert(context.name);
141 }
142 }
143 Err(_) => continue,
144 }
145 }
146
147 if all_contexts.is_empty() {
148 return Err(PluginError::InvalidConfig(
149 "No valid kubeconfig files found or no contexts available".to_string(),
150 ));
151 }
152
153 let mut contexts: Vec<String> = all_contexts.into_iter().collect();
154 contexts.sort();
155 Ok(contexts)
156 }
157}
158
159#[async_trait]
160impl Plugin for TektonPlugin {
161 fn metadata(&self) -> &PluginMetadata {
162 &self.metadata
163 }
164
165 fn provider_type(&self) -> &str {
166 "tekton"
167 }
168
169 fn initialize(
170 &mut self, provider_id: i64, config: HashMap<String, String>,
171 _http_client: Option<std::sync::Arc<reqwest::Client>>,
172 ) -> PluginResult<()> {
173 self.provider_id = Some(provider_id);
174 self.config = config;
175 Ok(())
176 }
177
178 async fn validate_credentials(&self) -> PluginResult<bool> {
179 let client = self.client().await?;
180 let namespace_mode = config::get_namespace_mode(&self.config);
181
182 let namespaces = match namespace_mode {
183 config::NamespaceMode::Custom => {
184 let manual_namespaces = config::get_namespaces(&self.config);
185
186 if manual_namespaces.is_empty() {
187 return Err(PluginError::InvalidConfig(
188 "Namespace mode is set to 'custom' but no namespaces are specified. Please provide at least one namespace in the 'namespaces' field (e.g., 'default,tekton-pipelines').".to_string(),
189 ));
190 }
191
192 client
193 .validate_namespaces_have_pipelines(&manual_namespaces)
194 .await?
195 }
196 config::NamespaceMode::All => match client.try_list_namespaces_cluster_wide().await {
197 Ok(all_namespaces) => {
198 if all_namespaces.is_empty() {
199 return Err(PluginError::InvalidConfig(
200 "No namespaces found in the cluster. Please verify your cluster connection and permissions.".to_string(),
201 ));
202 }
203 client.list_namespaces_with_pipelines().await?
204 }
205 Err(e) => return Err(e),
206 },
207 };
208
209 if namespaces.is_empty() {
210 let hint = match namespace_mode {
211 config::NamespaceMode::Custom => "Verify that the specified namespaces exist and contain Tekton pipelines, and that you have permissions to access them.",
212 config::NamespaceMode::All => "Try switching to 'custom' namespace mode and manually specify the namespaces containing your Tekton pipelines.",
213 };
214
215 return Err(PluginError::InvalidConfig(format!(
216 "No Tekton pipelines found in any accessible namespace. {}",
217 hint
218 )));
219 }
220
221 Ok(true)
222 }
223
224 async fn fetch_available_pipelines(
225 &self, params: Option<PaginationParams>,
226 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
227 let params = params.unwrap_or_default();
228 let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
229 let all_pipelines: Vec<_> = pipelines
230 .iter()
231 .map(mapper::map_available_pipeline)
232 .collect();
233
234 let total_count = all_pipelines.len();
235 let start = ((params.page - 1) * params.page_size).min(total_count);
236 let end = (start + params.page_size).min(total_count);
237 let items = all_pipelines[start..end].to_vec();
238
239 Ok(PaginatedResponse::new(
240 items,
241 params.page,
242 params.page_size,
243 total_count,
244 ))
245 }
246
247 async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
248 let provider_id = self
249 .provider_id
250 .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
251
252 let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
253
254 let pipeline_futures = pipelines.iter().map(|pipeline| async move {
255 let latest_run = self
256 .fetch_latest_run_for_pipeline(
257 &pipeline.metadata.namespace,
258 &pipeline.metadata.name,
259 )
260 .await;
261 mapper::map_pipeline(pipeline, latest_run.as_ref(), provider_id)
262 });
263
264 let results = join_all(pipeline_futures).await;
265 Ok(results)
266 }
267
268 async fn fetch_run_history(
269 &self, pipeline_id: &str, limit: usize,
270 ) -> PluginResult<Vec<PipelineRun>> {
271 let (provider_id, namespace, pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
272 let client = self.client().await?;
273
274 let mut runs = client
275 .list_pipelineruns(&namespace, Some(&pipeline_name))
276 .await?;
277
278 runs.sort_by(|a, b| {
279 let a_time = types::parse_timestamp(&a.metadata.creation_timestamp);
280 let b_time = types::parse_timestamp(&b.metadata.creation_timestamp);
281 b_time.cmp(&a_time)
282 });
283
284 let limited_runs: Vec<types::TektonPipelineRun> = runs.into_iter().take(limit).collect();
285
286 Ok(limited_runs
287 .iter()
288 .map(|run| mapper::map_pipeline_run(run, provider_id))
289 .collect())
290 }
291
292 async fn fetch_run_details(
293 &self, pipeline_id: &str, run_number: i64,
294 ) -> PluginResult<PipelineRun> {
295 let (provider_id, namespace, _pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
296 let client = self.client().await?;
297
298 let runs = client.list_pipelineruns(&namespace, None).await?;
299
300 let run = runs
301 .into_iter()
302 .find(|r| {
303 types::parse_timestamp(&r.metadata.creation_timestamp).map(|dt| dt.timestamp())
304 == Some(run_number)
305 })
306 .ok_or_else(|| {
307 PluginError::PipelineNotFound(format!(
308 "PipelineRun with timestamp {} not found",
309 run_number
310 ))
311 })?;
312
313 Ok(mapper::map_pipeline_run(&run, provider_id))
314 }
315
316 async fn fetch_workflow_parameters(
317 &self, workflow_id: &str,
318 ) -> PluginResult<Vec<WorkflowParameter>> {
319 let (_provider_id, namespace, pipeline_name) = config::parse_pipeline_id(workflow_id)?;
320 let client = self.client().await?;
321
322 let pipeline = client.get_pipeline(&namespace, &pipeline_name).await?;
323
324 Ok(mapper::map_workflow_parameters(&pipeline))
325 }
326
327 async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
328 let (_provider_id, namespace, pipeline_name) =
329 config::parse_pipeline_id(¶ms.workflow_id)?;
330
331 let client = self.client().await?;
332
333 let pipeline = client.get_pipeline(&namespace, &pipeline_name).await?;
334
335 let param_values: Vec<types::ParamValue> = if let Some(inputs) = ¶ms.inputs {
336 inputs
337 .as_object()
338 .map(|obj| {
339 obj.iter()
340 .map(|(key, value)| types::ParamValue {
341 name: key.clone(),
342 value: value.clone(),
343 })
344 .collect()
345 })
346 .unwrap_or_default()
347 } else {
348 vec![]
349 };
350
351 let workspaces: Vec<types::WorkspaceBinding> = pipeline
352 .spec
353 .workspaces
354 .iter()
355 .filter_map(|ws| {
356 if ws.optional.unwrap_or(false) {
357 None
358 } else {
359 Some(types::WorkspaceBinding {
360 name: ws.name.clone(),
361 empty_dir: Some(serde_json::json!({})),
362 persistent_volume_claim: None,
363 config_map: None,
364 secret: None,
365 })
366 }
367 })
368 .collect();
369
370 let run_name = format!("{}-{}", pipeline_name, chrono::Utc::now().timestamp());
371
372 let mut annotations = HashMap::new();
373 annotations.insert("tekton.dev/triggeredBy".to_string(), "pipedash".to_string());
374
375 let pipelinerun = types::TektonPipelineRun {
376 api_version: "tekton.dev/v1".to_string(),
377 kind: "PipelineRun".to_string(),
378 metadata: types::ObjectMeta {
379 name: run_name.clone(),
380 namespace: namespace.clone(),
381 creation_timestamp: None,
382 labels: HashMap::new(),
383 annotations,
384 },
385 spec: types::PipelineRunSpec {
386 pipeline_ref: Some(types::PipelineRef {
387 name: pipeline_name.clone(),
388 }),
389 params: param_values,
390 workspaces,
391 timeout: None,
392 task_run_template: None,
393 },
394 status: types::PipelineRunStatus {
395 conditions: vec![],
396 start_time: None,
397 completion_time: None,
398 task_runs: HashMap::new(),
399 child_references: vec![],
400 },
401 };
402
403 let created_run = client.create_pipelinerun(&namespace, &pipelinerun).await?;
404
405 Ok(format!(
406 "PipelineRun created: {}/{}",
407 namespace, created_run.metadata.name
408 ))
409 }
410
411 async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
412 let (_provider_id, namespace, _pipeline_name) = config::parse_pipeline_id(pipeline_id)?;
413 let client = self.client().await?;
414
415 let runs = client.list_pipelineruns(&namespace, None).await?;
416
417 let matching_runs: Vec<_> = runs
418 .into_iter()
419 .filter(|r| {
420 types::parse_timestamp(&r.metadata.creation_timestamp).map(|dt| dt.timestamp())
421 == Some(run_number)
422 })
423 .collect();
424
425 if matching_runs.is_empty() {
426 return Err(PluginError::PipelineNotFound(format!(
427 "PipelineRun with timestamp {} not found",
428 run_number
429 )));
430 }
431
432 if matching_runs.len() > 1 {
433 tracing::warn!(
434 run_number = run_number,
435 run_name = %matching_runs[0].metadata.name,
436 count = matching_runs.len(),
437 "Multiple PipelineRuns found with same timestamp, cancelling first one"
438 );
439 }
440
441 let run = &matching_runs[0];
442
443 client
444 .delete_pipelinerun(&namespace, &run.metadata.name)
445 .await?;
446
447 Ok(())
448 }
449
450 async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
451 Ok(vec![Organization {
452 id: "default".to_string(),
453 name: "All Namespaces".to_string(),
454 description: Some("All accessible Kubernetes namespaces".to_string()),
455 }])
456 }
457
458 async fn fetch_available_pipelines_filtered(
459 &self, _org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
460 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
461 let params = params.unwrap_or_default();
462 let pipelines = self.fetch_all_pipelines_in_namespaces().await?;
463 let mut all_pipelines: Vec<_> = pipelines
464 .iter()
465 .map(mapper::map_available_pipeline)
466 .collect();
467
468 if let Some(search_term) = search {
469 let search_lower = search_term.to_lowercase();
470 all_pipelines.retain(|p| {
471 p.name.to_lowercase().contains(&search_lower)
472 || p.id.to_lowercase().contains(&search_lower)
473 || p.description
474 .as_ref()
475 .is_some_and(|d| d.to_lowercase().contains(&search_lower))
476 });
477 }
478
479 let total_count = all_pipelines.len();
480 let start = ((params.page - 1) * params.page_size).min(total_count);
481 let end = (start + params.page_size).min(total_count);
482 let items = all_pipelines[start..end].to_vec();
483
484 Ok(PaginatedResponse::new(
485 items,
486 params.page,
487 params.page_size,
488 total_count,
489 ))
490 }
491
492 async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
493 Err(PluginError::NotSupported(
494 "Build agents not supported by Tekton plugin".to_string(),
495 ))
496 }
497
498 async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
499 Err(PluginError::NotSupported(
500 "Artifacts not implemented for Tekton plugin".to_string(),
501 ))
502 }
503
504 async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
505 Err(PluginError::NotSupported(
506 "Build queues not supported by Tekton plugin".to_string(),
507 ))
508 }
509
510 fn get_migrations(&self) -> Vec<String> {
511 vec![]
512 }
513
514 async fn get_field_options(
515 &self, field_key: &str, config: &HashMap<String, String>,
516 ) -> PluginResult<Vec<String>> {
517 if field_key == "context" {
518 let kubeconfig_path = config::get_kubeconfig_path(config);
519 let contexts = self.get_available_contexts(kubeconfig_path.as_deref())?;
520 Ok(contexts)
521 } else if field_key == "namespaces" {
522 let kubeconfig_path = config::get_kubeconfig_path(config);
523 let context = config::get_context(config);
524
525 match client::TektonClient::from_kubeconfig(
526 kubeconfig_path.as_deref(),
527 context.as_deref(),
528 )
529 .await
530 {
531 Ok(temp_client) => match temp_client.try_list_namespaces_cluster_wide().await {
532 Ok(namespaces) => Ok(namespaces),
533 Err(e) => {
534 tracing::warn!(error = %e, "Failed to fetch namespaces for Tekton autocomplete");
535 Ok(Vec::new())
536 }
537 },
538 Err(e) => {
539 tracing::warn!(error = %e, "Failed to create Tekton client for namespace autocomplete");
540 Ok(Vec::new())
541 }
542 }
543 } else {
544 Ok(Vec::new())
545 }
546 }
547}