pipedash_plugin_argocd/
plugin.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use pipedash_plugin_api::*;
5use tracing::{
6    debug,
7    info,
8    warn,
9};
10
11use crate::{
12    client,
13    config,
14    mapper,
15    metadata,
16};
17
18const DEFAULT_PAGE_SIZE: usize = 1000;
19
20pub struct ArgocdPlugin {
21    metadata: PluginMetadata,
22    client: Option<client::ArgocdClient>,
23    provider_id: Option<i64>,
24    config: HashMap<String, String>,
25    server_url: Option<String>,
26    organizations_filter: Option<Vec<String>>,
27}
28
29impl Default for ArgocdPlugin {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl ArgocdPlugin {
36    pub fn new() -> Self {
37        Self {
38            metadata: metadata::create_metadata(),
39            client: None,
40            provider_id: None,
41            config: HashMap::new(),
42            server_url: None,
43            organizations_filter: None,
44        }
45    }
46
47    fn client(&self) -> PluginResult<&client::ArgocdClient> {
48        self.client
49            .as_ref()
50            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
51    }
52
53    fn get_server_url(&self) -> PluginResult<&str> {
54        self.server_url
55            .as_deref()
56            .ok_or_else(|| PluginError::Internal("Server URL not set".to_string()))
57    }
58}
59
60#[async_trait]
61impl Plugin for ArgocdPlugin {
62    fn metadata(&self) -> &PluginMetadata {
63        &self.metadata
64    }
65
66    fn provider_type(&self) -> &str {
67        "argocd"
68    }
69
70    fn initialize(
71        &mut self, provider_id: i64, config: HashMap<String, String>,
72        http_client: Option<std::sync::Arc<reqwest::Client>>,
73    ) -> PluginResult<()> {
74        info!(provider_id, "Initializing ArgoCD plugin");
75        debug!(config_keys = ?config.keys().collect::<Vec<_>>());
76
77        let server_url = config::get_server_url(&config)?;
78        debug!(server_url, "Configured server URL");
79
80        let token = config::get_token(&config)?;
81        debug!(token_length = token.len(), "Retrieved authentication token");
82
83        let insecure = config::is_insecure(&config);
84        if insecure {
85            warn!("Insecure TLS mode enabled - certificate verification disabled");
86        }
87
88        let organizations_filter = config::parse_organizations_filter(&config);
89        debug!(?organizations_filter, "Organizations filter configured");
90
91        let client = client::ArgocdClient::new(http_client, server_url.clone(), token, insecure)?;
92        debug!("ArgoCD client created successfully");
93
94        self.client = Some(client);
95        self.provider_id = Some(provider_id);
96        self.server_url = Some(server_url);
97        self.organizations_filter = organizations_filter;
98        self.config = config;
99
100        info!("ArgoCD plugin initialization complete");
101        Ok(())
102    }
103
104    async fn validate_credentials(&self) -> PluginResult<bool> {
105        debug!("Validating ArgoCD credentials");
106        let client = self.client()?;
107        match client.list_applications(None).await {
108            Ok(apps) => {
109                info!(app_count = apps.len(), "Credentials validated successfully");
110                Ok(true)
111            }
112            Err(e) => {
113                warn!(error = ?e, "Credential validation failed");
114                Err(e)
115            }
116        }
117    }
118
119    async fn fetch_available_pipelines(
120        &self, params: Option<PaginationParams>,
121    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
122        debug!("Fetching available pipelines");
123        let client = self.client()?;
124
125        let mut apps = client.list_applications(None).await?;
126        debug!(
127            total_apps = apps.len(),
128            "Retrieved applications from ArgoCD"
129        );
130
131        if let Some(ref orgs_filter) = self.organizations_filter {
132            debug!(?orgs_filter, "Filtering applications by organizations");
133            apps.retain(|app| {
134                let git_org = config::extract_git_org(&app.spec.source.repo_url);
135                orgs_filter.contains(&git_org)
136            });
137            debug!(
138                filtered_apps = apps.len(),
139                "Applications after organization filtering"
140            );
141        }
142
143        let total_count = apps.len();
144        let mut available_pipelines: Vec<AvailablePipeline> =
145            apps.iter().map(mapper::map_available_pipeline).collect();
146
147        let (page, page_size) = if let Some(p) = params {
148            let page_num = if p.page == 0 { 1 } else { p.page };
149            let size = if p.page_size == 0 {
150                DEFAULT_PAGE_SIZE
151            } else {
152                p.page_size
153            };
154
155            let offset = (page_num.saturating_sub(1)) * size;
156
157            available_pipelines = available_pipelines
158                .into_iter()
159                .skip(offset)
160                .take(size)
161                .collect();
162
163            (page_num, size)
164        } else {
165            (1, DEFAULT_PAGE_SIZE)
166        };
167
168        debug!(
169            pipeline_count = available_pipelines.len(),
170            total = total_count,
171            page,
172            page_size,
173            "Mapped and paginated available pipelines"
174        );
175
176        Ok(PaginatedResponse::new(
177            available_pipelines,
178            page,
179            page_size,
180            total_count,
181        ))
182    }
183
184    async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
185        debug!("Fetching Git organizations from applications");
186        let client = self.client()?;
187
188        let apps = client.list_applications(None).await?;
189
190        debug!(
191            app_count = apps.len(),
192            "Extracting organizations from applications"
193        );
194
195        let mut orgs_map: std::collections::HashMap<String, Organization> =
196            std::collections::HashMap::new();
197
198        for app in apps {
199            let git_org = config::extract_git_org(&app.spec.source.repo_url);
200
201            if git_org == "unknown" || git_org.is_empty() {
202                continue;
203            }
204
205            if !orgs_map.contains_key(&git_org) {
206                orgs_map.insert(
207                    git_org.clone(),
208                    Organization {
209                        id: git_org.clone(),
210                        name: git_org.clone(),
211                        description: Some(format!("Git Organization: {}", git_org)),
212                    },
213                );
214            }
215        }
216
217        let organizations: Vec<Organization> = orgs_map.into_values().collect();
218        debug!(
219            org_count = organizations.len(),
220            org_names = ?organizations.iter().map(|o| &o.name).collect::<Vec<_>>(),
221            "Extracted unique Git organizations"
222        );
223
224        Ok(organizations)
225    }
226
227    async fn fetch_available_pipelines_filtered(
228        &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
229    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
230        debug!(?org, ?search, "Fetching filtered available pipelines");
231
232        let client = self.client()?;
233
234        let apps = client.list_applications(None).await?;
235
236        debug!(
237            total_apps = apps.len(),
238            "Retrieved applications for filtering"
239        );
240
241        let filtered_apps: Vec<_> = apps
242            .into_iter()
243            .filter(|app| {
244                let org_match = org.as_ref().is_none_or(|o| {
245                    let git_org = config::extract_git_org(&app.spec.source.repo_url);
246                    git_org == *o
247                });
248
249                let search_match = search.as_ref().is_none_or(|s| {
250                    app.metadata.name.to_lowercase().contains(&s.to_lowercase())
251                        || app
252                            .spec
253                            .source
254                            .repo_url
255                            .to_lowercase()
256                            .contains(&s.to_lowercase())
257                });
258
259                org_match && search_match
260            })
261            .collect();
262
263        debug!(
264            filtered_apps = filtered_apps.len(),
265            "Applications after applying filters"
266        );
267
268        let total_count = filtered_apps.len();
269        let mut available_pipelines: Vec<AvailablePipeline> = filtered_apps
270            .iter()
271            .map(mapper::map_available_pipeline)
272            .collect();
273
274        let (page, page_size) = if let Some(p) = params {
275            let page_num = if p.page == 0 { 1 } else { p.page };
276            let size = if p.page_size == 0 {
277                DEFAULT_PAGE_SIZE
278            } else {
279                p.page_size
280            };
281
282            let offset = (page_num.saturating_sub(1)) * size;
283
284            available_pipelines = available_pipelines
285                .into_iter()
286                .skip(offset)
287                .take(size)
288                .collect();
289
290            (page_num, size)
291        } else {
292            (1, DEFAULT_PAGE_SIZE)
293        };
294
295        debug!(
296            pipeline_count = available_pipelines.len(),
297            total = total_count,
298            page,
299            page_size,
300            "Mapped and paginated filtered pipelines"
301        );
302
303        Ok(PaginatedResponse::new(
304            available_pipelines,
305            page,
306            page_size,
307            total_count,
308        ))
309    }
310
311    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
312        debug!("Fetching configured pipelines");
313        let provider_id = self
314            .provider_id
315            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
316
317        let client = self.client()?;
318        let server_url = self.get_server_url()?;
319        debug!(server_url, "Using configured server URL");
320
321        let mut apps = client.list_applications(None).await?;
322        debug!(total_apps = apps.len(), "Retrieved all applications");
323
324        if let Some(ref orgs_filter) = self.organizations_filter {
325            debug!(?orgs_filter, "Applying organization filter");
326            apps.retain(|app| {
327                let git_org = config::extract_git_org(&app.spec.source.repo_url);
328                orgs_filter.contains(&git_org)
329            });
330            debug!(
331                filtered_apps = apps.len(),
332                "Applications after organization filter"
333            );
334        }
335
336        let filtered_apps = if let Some(selected_items) = config::parse_selected_items(&self.config)
337        {
338            debug!(
339                selected_count = selected_items.len(),
340                "Applying user selection filter"
341            );
342            apps.into_iter()
343                .filter(|app| selected_items.contains(&app.metadata.name))
344                .collect()
345        } else {
346            debug!("No user selection - returning all applications");
347            apps
348        };
349
350        debug!(
351            final_apps = filtered_apps.len(),
352            "Applications after all filters"
353        );
354
355        let pipelines: Vec<Pipeline> = filtered_apps
356            .iter()
357            .map(|app| mapper::map_application_to_pipeline(app, provider_id, server_url))
358            .collect();
359
360        debug!(
361            pipeline_count = pipelines.len(),
362            "Mapped applications to pipelines"
363        );
364        Ok(pipelines)
365    }
366
367    async fn fetch_run_history(
368        &self, pipeline_id: &str, limit: usize,
369    ) -> PluginResult<Vec<PipelineRun>> {
370        let (provider_id, _namespace, app_name) = config::parse_pipeline_id(pipeline_id)?;
371        let client = self.client()?;
372        let server_url = self.get_server_url()?;
373
374        let app = client.get_application(&app_name).await?;
375
376        let history = app.status.history.as_deref().unwrap_or(&[]);
377
378        let mut runs: Vec<PipelineRun> = history
379            .iter()
380            .rev()
381            .take(limit)
382            .map(|h| mapper::map_history_to_run(h, &app, provider_id, server_url))
383            .collect();
384
385        if let Some(current_run) = mapper::map_operation_to_run(&app, provider_id, server_url) {
386            runs.insert(0, current_run);
387        }
388
389        Ok(runs)
390    }
391
392    async fn fetch_run_details(
393        &self, pipeline_id: &str, run_number: i64,
394    ) -> PluginResult<PipelineRun> {
395        let (provider_id, _namespace, app_name) = config::parse_pipeline_id(pipeline_id)?;
396        let client = self.client()?;
397        let server_url = self.get_server_url()?;
398
399        let app = client.get_application(&app_name).await?;
400
401        if let Some(operation) = mapper::map_operation_to_run(&app, provider_id, server_url) {
402            if operation.run_number == run_number {
403                return Ok(operation);
404            }
405        }
406
407        let history = app.status.history.as_deref().unwrap_or(&[]);
408        let history_item = history
409            .iter()
410            .find(|h| h.deployed_at.timestamp() == run_number)
411            .ok_or_else(|| {
412                PluginError::PipelineNotFound(format!("Run {} not found", run_number))
413            })?;
414
415        Ok(mapper::map_history_to_run(
416            history_item,
417            &app,
418            provider_id,
419            server_url,
420        ))
421    }
422
423    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
424        let (_provider_id, _namespace, app_name) = config::parse_pipeline_id(&params.workflow_id)?;
425        let client = self.client()?;
426
427        let inputs = params.inputs.as_ref();
428
429        let revision = inputs
430            .and_then(|i| i.get("revision"))
431            .and_then(|v| v.as_str())
432            .map(String::from);
433
434        let prune = inputs
435            .and_then(|i| i.get("prune"))
436            .and_then(|v| v.as_bool())
437            .unwrap_or(false);
438
439        let dry_run = inputs
440            .and_then(|i| i.get("dry_run"))
441            .and_then(|v| v.as_bool())
442            .unwrap_or(false);
443
444        let force = inputs
445            .and_then(|i| i.get("force"))
446            .and_then(|v| v.as_bool())
447            .unwrap_or(false);
448
449        let apply_only = inputs
450            .and_then(|i| i.get("apply_only"))
451            .and_then(|v| v.as_bool())
452            .unwrap_or(false);
453
454        info!(
455            app_name,
456            ?revision,
457            prune,
458            dry_run,
459            force,
460            apply_only,
461            "Triggering ArgoCD sync operation"
462        );
463
464        client
465            .sync_application(&app_name, revision, prune, dry_run, force, apply_only)
466            .await?;
467
468        let sync_type = if dry_run { "Dry run" } else { "Sync" };
469        Ok(format!(
470            "{} triggered for application: {}",
471            sync_type, app_name
472        ))
473    }
474
475    async fn cancel_run(&self, pipeline_id: &str, _run_number: i64) -> PluginResult<()> {
476        let (_provider_id, _namespace, app_name) = config::parse_pipeline_id(pipeline_id)?;
477        let client = self.client()?;
478
479        client.terminate_operation(&app_name).await?;
480
481        Ok(())
482    }
483
484    async fn fetch_workflow_parameters(
485        &self, _workflow_id: &str,
486    ) -> PluginResult<Vec<WorkflowParameter>> {
487        Ok(vec![
488            WorkflowParameter {
489                name: "revision".to_string(),
490                label: Some("Revision".to_string()),
491                description: Some("Git revision (branch, tag, or commit SHA) to sync to. Leave empty to use target revision.".to_string()),
492                param_type: WorkflowParameterType::String { default: None },
493                required: false,
494            },
495            WorkflowParameter {
496                name: "prune".to_string(),
497                label: Some("Prune Resources".to_string()),
498                description: Some("Delete resources that are no longer defined in Git".to_string()),
499                param_type: WorkflowParameterType::Boolean { default: false },
500                required: false,
501            },
502            WorkflowParameter {
503                name: "dry_run".to_string(),
504                label: Some("Dry Run".to_string()),
505                description: Some("Preview sync without applying changes".to_string()),
506                param_type: WorkflowParameterType::Boolean { default: false },
507                required: false,
508            },
509            WorkflowParameter {
510                name: "force".to_string(),
511                label: Some("Force Sync".to_string(),),
512                description: Some("Force sync even if resources are already synced (overrides any state)".to_string()),
513                param_type: WorkflowParameterType::Boolean { default: false },
514                required: false,
515            },
516            WorkflowParameter {
517                name: "apply_only".to_string(),
518                label: Some("Apply Only (Skip Hooks)".to_string()),
519                description: Some("Skip pre and post sync hooks".to_string()),
520                param_type: WorkflowParameterType::Boolean { default: false },
521                required: false,
522            },
523        ])
524    }
525
526    async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
527        Ok(vec![])
528    }
529
530    async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
531        Ok(vec![])
532    }
533
534    async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
535        Ok(vec![])
536    }
537
538    fn get_migrations(&self) -> Vec<String> {
539        vec![]
540    }
541
542    async fn get_field_options(
543        &self, field_key: &str, config: &HashMap<String, String>,
544    ) -> PluginResult<Vec<String>> {
545        if field_key == "organizations" {
546            let server_url = config::get_server_url(config)?;
547            let token = config::get_token(config)?;
548            let insecure = config::is_insecure(config);
549
550            let temp_client = client::ArgocdClient::new(None, server_url, token, insecure)?;
551
552            let apps = temp_client.list_applications(None).await?;
553
554            let mut git_orgs = std::collections::HashSet::new();
555            for app in apps {
556                let git_org = config::extract_git_org(&app.spec.source.repo_url);
557                if git_org != "unknown" && !git_org.is_empty() {
558                    git_orgs.insert(git_org);
559                }
560            }
561
562            let mut orgs: Vec<String> = git_orgs.into_iter().collect();
563            orgs.sort();
564
565            debug!(
566                org_count = orgs.len(),
567                "Returning Git organizations for dropdown"
568            );
569            Ok(orgs)
570        } else {
571            Ok(vec![])
572        }
573    }
574}