pipedash_plugin_jenkins/
plugin.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    mapper,
12    metadata,
13};
14
15pub struct JenkinsPlugin {
16    metadata: PluginMetadata,
17    client: Option<client::JenkinsClient>,
18    provider_id: Option<i64>,
19    config: HashMap<String, String>,
20}
21
22impl Default for JenkinsPlugin {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl JenkinsPlugin {
29    pub fn new() -> Self {
30        Self {
31            metadata: metadata::create_metadata(),
32            client: None,
33            provider_id: None,
34            config: HashMap::new(),
35        }
36    }
37
38    fn client(&self) -> PluginResult<&client::JenkinsClient> {
39        self.client
40            .as_ref()
41            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
42    }
43}
44
45#[async_trait]
46impl Plugin for JenkinsPlugin {
47    fn metadata(&self) -> &PluginMetadata {
48        &self.metadata
49    }
50
51    fn initialize(
52        &mut self, provider_id: i64, config: HashMap<String, String>,
53        http_client: Option<std::sync::Arc<reqwest::Client>>,
54    ) -> PluginResult<()> {
55        let token = config
56            .get("token")
57            .ok_or_else(|| PluginError::InvalidConfig("Missing Jenkins API token".to_string()))?;
58
59        let username = config
60            .get("username")
61            .ok_or_else(|| PluginError::InvalidConfig("Missing Jenkins username".to_string()))?;
62
63        let auth_value = format!("{username}:{token}");
64        let auth_header = format!(
65            "Basic {}",
66            base64::Engine::encode(
67                &base64::engine::general_purpose::STANDARD,
68                auth_value.as_bytes()
69            )
70        );
71
72        let server_url = config
73            .get("server_url")
74            .ok_or_else(|| PluginError::InvalidConfig("Missing server_url".to_string()))?
75            .trim_end_matches('/')
76            .to_string();
77
78        let client = http_client.unwrap_or_else(|| {
79            std::sync::Arc::new(
80                reqwest::Client::builder()
81                    .use_rustls_tls()
82                    .pool_max_idle_per_host(10)
83                    .timeout(Duration::from_secs(30))
84                    .connect_timeout(Duration::from_secs(10))
85                    .tcp_keepalive(Duration::from_secs(60))
86                    .build()
87                    .expect("Failed to build HTTP client"),
88            )
89        });
90
91        self.client = Some(client::JenkinsClient::new(client, server_url, auth_header));
92        self.provider_id = Some(provider_id);
93        self.config = config;
94
95        Ok(())
96    }
97
98    async fn validate_credentials(&self) -> PluginResult<bool> {
99        let client = self.client()?;
100
101        client.discover_all_jobs().await?;
102        Ok(true)
103    }
104
105    async fn fetch_available_pipelines(
106        &self, params: Option<PaginationParams>,
107    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
108        let params = params.unwrap_or_default();
109        let client = self.client()?;
110        let all_jobs = client.discover_all_jobs().await?;
111        let all_pipelines = client.discovered_jobs_to_available_pipelines(all_jobs);
112
113        let total_count = all_pipelines.len();
114        let start = ((params.page - 1) * params.page_size).min(total_count);
115        let end = (start + params.page_size).min(total_count);
116        let items = all_pipelines[start..end].to_vec();
117
118        Ok(PaginatedResponse::new(
119            items,
120            params.page,
121            params.page_size,
122            total_count,
123        ))
124    }
125
126    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
127        let provider_id = self
128            .provider_id
129            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
130
131        let job_paths = config::parse_selected_items(&self.config)?;
132
133        if job_paths.is_empty() {
134            return Err(PluginError::InvalidConfig("No jobs configured".to_string()));
135        }
136
137        let client = self.client()?;
138        let futures = job_paths
139            .iter()
140            .map(|job_path| client.fetch_pipeline(provider_id, job_path.clone()));
141
142        let results = join_all(futures).await;
143
144        let mut all_pipelines = Vec::new();
145        let mut errors = Vec::new();
146
147        for result in results {
148            match result {
149                Ok(pipeline) => all_pipelines.push(pipeline),
150                Err(e) => errors.push(e),
151            }
152        }
153
154        if !errors.is_empty() && all_pipelines.is_empty() {
155            return Err(errors.into_iter().next().unwrap());
156        }
157
158        Ok(all_pipelines)
159    }
160
161    async fn fetch_run_history(
162        &self, pipeline_id: &str, limit: usize,
163    ) -> PluginResult<Vec<PipelineRun>> {
164        let parts: Vec<&str> = pipeline_id.split("__").collect();
165        if parts.len() != 3 {
166            return Err(PluginError::InvalidConfig(format!(
167                "Invalid pipeline ID format: {pipeline_id}"
168            )));
169        }
170
171        let job_path = parts[2];
172        let client = self.client()?;
173        let builds = client.fetch_build_history(job_path, limit).await?;
174
175        let pipeline_runs = builds
176            .into_iter()
177            .map(|build| {
178                let encoded_path = config::encode_job_name(job_path);
179                mapper::build_to_pipeline_run(
180                    build,
181                    pipeline_id,
182                    client.server_url(),
183                    &encoded_path,
184                )
185            })
186            .collect();
187
188        Ok(pipeline_runs)
189    }
190
191    async fn fetch_run_details(
192        &self, pipeline_id: &str, run_number: i64,
193    ) -> PluginResult<PipelineRun> {
194        let parts: Vec<&str> = pipeline_id.split("__").collect();
195        if parts.len() != 3 {
196            return Err(PluginError::InvalidConfig(format!(
197                "Invalid pipeline ID format: {pipeline_id}"
198            )));
199        }
200
201        let job_path = parts[2];
202        let client = self.client()?;
203        let build = client.fetch_build_details(job_path, run_number).await?;
204
205        let encoded_path = config::encode_job_name(job_path);
206        Ok(mapper::build_to_pipeline_run(
207            build,
208            pipeline_id,
209            client.server_url(),
210            &encoded_path,
211        ))
212    }
213
214    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
215        let parts: Vec<&str> = params.workflow_id.split("__").collect();
216        if parts.len() != 3 {
217            return Err(PluginError::InvalidConfig(format!(
218                "Invalid workflow ID format: {}",
219                params.workflow_id
220            )));
221        }
222
223        let job_path = parts[2];
224
225        let mut form_data = Vec::new();
226        if let Some(inputs) = params.inputs {
227            if let Some(obj) = inputs.as_object() {
228                for (k, v) in obj.iter() {
229                    if v.is_null() {
230                        continue;
231                    }
232
233                    if v.is_array() {
234                        if let Some(arr) = v.as_array() {
235                            for item in arr {
236                                if item.is_null() {
237                                    continue;
238                                }
239                                let value_str = if item.is_boolean() {
240                                    item.as_bool().unwrap().to_string()
241                                } else if item.is_number() {
242                                    item.to_string()
243                                } else {
244                                    item.as_str()
245                                        .map(|s| s.to_string())
246                                        .unwrap_or_else(|| item.to_string())
247                                };
248                                form_data.push((k.clone(), value_str));
249                            }
250                        }
251                    } else if v.is_boolean() {
252                        form_data.push((k.clone(), v.as_bool().unwrap().to_string()));
253                    } else if v.is_number() {
254                        form_data.push((k.clone(), v.to_string()));
255                    } else if let Some(s) = v.as_str() {
256                        form_data.push((k.clone(), s.to_string()));
257                    }
258                }
259            }
260        }
261
262        if form_data.is_empty() {
263            form_data.push(("json".to_string(), serde_json::json!({}).to_string()));
264        }
265
266        let client = self.client()?;
267        client.trigger_build(job_path, form_data).await?;
268
269        Ok(serde_json::json!({
270            "message": format!("Triggered build for job {job_path}"),
271            "job_path": job_path
272        })
273        .to_string())
274    }
275
276    async fn fetch_workflow_parameters(
277        &self, workflow_id: &str,
278    ) -> PluginResult<Vec<WorkflowParameter>> {
279        let start = std::time::Instant::now();
280        tracing::debug!(workflow_id = %workflow_id, "Fetching Jenkins workflow parameters");
281
282        let parts: Vec<&str> = workflow_id.split("__").collect();
283        if parts.len() != 3 {
284            return Err(PluginError::InvalidConfig(format!(
285                "Invalid workflow ID format: {workflow_id}"
286            )));
287        }
288
289        let job_path = parts[2];
290        let client = self.client()?;
291        let response = client.fetch_job_parameters(job_path).await?;
292
293        let param_definitions: Vec<_> = response
294            .property
295            .into_iter()
296            .filter(|prop| {
297                prop._class
298                    .as_ref()
299                    .map(|c| {
300                        c.contains("ParametersDefinitionProperty")
301                            || c.contains("ParametersProperty")
302                    })
303                    .unwrap_or(true)
304            })
305            .flat_map(|prop| prop.parameter_definitions)
306            .collect();
307
308        let parameters = mapper::parameter_definitions_to_workflow_parameters(param_definitions);
309
310        tracing::debug!(
311            count = parameters.len(),
312            elapsed = ?start.elapsed(),
313            "Processed Jenkins workflow parameters"
314        );
315        Ok(parameters)
316    }
317
318    async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
319        Ok(vec![Organization {
320            id: "default".to_string(),
321            name: "Jenkins Server".to_string(),
322            description: Some("All accessible Jenkins jobs".to_string()),
323        }])
324    }
325
326    async fn fetch_available_pipelines_filtered(
327        &self, _org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
328    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
329        let params = params.unwrap_or_default();
330        let client = self.client()?;
331        let all_jobs = client.discover_all_jobs().await?;
332        let mut all_pipelines = client.discovered_jobs_to_available_pipelines(all_jobs);
333
334        if let Some(search_term) = search {
335            let search_lower = search_term.to_lowercase();
336            all_pipelines.retain(|p| {
337                p.name.to_lowercase().contains(&search_lower)
338                    || p.id.to_lowercase().contains(&search_lower)
339                    || p.description
340                        .as_ref()
341                        .is_some_and(|d| d.to_lowercase().contains(&search_lower))
342            });
343        }
344
345        let total_count = all_pipelines.len();
346        let start = ((params.page - 1) * params.page_size).min(total_count);
347        let end = (start + params.page_size).min(total_count);
348        let items = all_pipelines[start..end].to_vec();
349
350        Ok(PaginatedResponse::new(
351            items,
352            params.page,
353            params.page_size,
354            total_count,
355        ))
356    }
357
358    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
359        let parts: Vec<&str> = pipeline_id.split("__").collect();
360        if parts.len() != 3 {
361            return Err(PluginError::InvalidConfig(format!(
362                "Invalid pipeline ID format: {pipeline_id}"
363            )));
364        }
365
366        let job_path = parts[2];
367        let client = self.client()?;
368        client.cancel_build(job_path, run_number).await
369    }
370}