pipedash_plugin_jenkins/
plugin.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9 client,
10 config,
11 mapper,
12 metadata,
13};
14
15pub struct JenkinsPlugin {
16 metadata: PluginMetadata,
17 client: Option<client::JenkinsClient>,
18 provider_id: Option<i64>,
19 config: HashMap<String, String>,
20}
21
22impl Default for JenkinsPlugin {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl JenkinsPlugin {
29 pub fn new() -> Self {
30 Self {
31 metadata: metadata::create_metadata(),
32 client: None,
33 provider_id: None,
34 config: HashMap::new(),
35 }
36 }
37
38 fn client(&self) -> PluginResult<&client::JenkinsClient> {
39 self.client
40 .as_ref()
41 .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
42 }
43}
44
45#[async_trait]
46impl Plugin for JenkinsPlugin {
47 fn metadata(&self) -> &PluginMetadata {
48 &self.metadata
49 }
50
51 fn initialize(
52 &mut self, provider_id: i64, config: HashMap<String, String>,
53 http_client: Option<std::sync::Arc<reqwest::Client>>,
54 ) -> PluginResult<()> {
55 let token = config
56 .get("token")
57 .ok_or_else(|| PluginError::InvalidConfig("Missing Jenkins API token".to_string()))?;
58
59 let username = config
60 .get("username")
61 .ok_or_else(|| PluginError::InvalidConfig("Missing Jenkins username".to_string()))?;
62
63 let auth_value = format!("{username}:{token}");
64 let auth_header = format!(
65 "Basic {}",
66 base64::Engine::encode(
67 &base64::engine::general_purpose::STANDARD,
68 auth_value.as_bytes()
69 )
70 );
71
72 let server_url = config
73 .get("server_url")
74 .ok_or_else(|| PluginError::InvalidConfig("Missing server_url".to_string()))?
75 .trim_end_matches('/')
76 .to_string();
77
78 let client = http_client.unwrap_or_else(|| {
79 std::sync::Arc::new(
80 reqwest::Client::builder()
81 .use_rustls_tls()
82 .pool_max_idle_per_host(10)
83 .timeout(Duration::from_secs(30))
84 .connect_timeout(Duration::from_secs(10))
85 .tcp_keepalive(Duration::from_secs(60))
86 .build()
87 .expect("Failed to build HTTP client"),
88 )
89 });
90
91 self.client = Some(client::JenkinsClient::new(client, server_url, auth_header));
92 self.provider_id = Some(provider_id);
93 self.config = config;
94
95 Ok(())
96 }
97
98 async fn validate_credentials(&self) -> PluginResult<bool> {
99 let client = self.client()?;
100
101 client.discover_all_jobs().await?;
102 Ok(true)
103 }
104
105 async fn fetch_available_pipelines(
106 &self, params: Option<PaginationParams>,
107 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
108 let params = params.unwrap_or_default();
109 let client = self.client()?;
110 let all_jobs = client.discover_all_jobs().await?;
111 let all_pipelines = client.discovered_jobs_to_available_pipelines(all_jobs);
112
113 let total_count = all_pipelines.len();
114 let start = ((params.page - 1) * params.page_size).min(total_count);
115 let end = (start + params.page_size).min(total_count);
116 let items = all_pipelines[start..end].to_vec();
117
118 Ok(PaginatedResponse::new(
119 items,
120 params.page,
121 params.page_size,
122 total_count,
123 ))
124 }
125
126 async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
127 let provider_id = self
128 .provider_id
129 .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
130
131 let job_paths = config::parse_selected_items(&self.config)?;
132
133 if job_paths.is_empty() {
134 return Err(PluginError::InvalidConfig("No jobs configured".to_string()));
135 }
136
137 let client = self.client()?;
138 let futures = job_paths
139 .iter()
140 .map(|job_path| client.fetch_pipeline(provider_id, job_path.clone()));
141
142 let results = join_all(futures).await;
143
144 let mut all_pipelines = Vec::new();
145 let mut errors = Vec::new();
146
147 for result in results {
148 match result {
149 Ok(pipeline) => all_pipelines.push(pipeline),
150 Err(e) => errors.push(e),
151 }
152 }
153
154 if !errors.is_empty() && all_pipelines.is_empty() {
155 return Err(errors.into_iter().next().unwrap());
156 }
157
158 Ok(all_pipelines)
159 }
160
161 async fn fetch_run_history(
162 &self, pipeline_id: &str, limit: usize,
163 ) -> PluginResult<Vec<PipelineRun>> {
164 let parts: Vec<&str> = pipeline_id.split("__").collect();
165 if parts.len() != 3 {
166 return Err(PluginError::InvalidConfig(format!(
167 "Invalid pipeline ID format: {pipeline_id}"
168 )));
169 }
170
171 let job_path = parts[2];
172 let client = self.client()?;
173 let builds = client.fetch_build_history(job_path, limit).await?;
174
175 let pipeline_runs = builds
176 .into_iter()
177 .map(|build| {
178 let encoded_path = config::encode_job_name(job_path);
179 mapper::build_to_pipeline_run(
180 build,
181 pipeline_id,
182 client.server_url(),
183 &encoded_path,
184 )
185 })
186 .collect();
187
188 Ok(pipeline_runs)
189 }
190
191 async fn fetch_run_details(
192 &self, pipeline_id: &str, run_number: i64,
193 ) -> PluginResult<PipelineRun> {
194 let parts: Vec<&str> = pipeline_id.split("__").collect();
195 if parts.len() != 3 {
196 return Err(PluginError::InvalidConfig(format!(
197 "Invalid pipeline ID format: {pipeline_id}"
198 )));
199 }
200
201 let job_path = parts[2];
202 let client = self.client()?;
203 let build = client.fetch_build_details(job_path, run_number).await?;
204
205 let encoded_path = config::encode_job_name(job_path);
206 Ok(mapper::build_to_pipeline_run(
207 build,
208 pipeline_id,
209 client.server_url(),
210 &encoded_path,
211 ))
212 }
213
214 async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
215 let parts: Vec<&str> = params.workflow_id.split("__").collect();
216 if parts.len() != 3 {
217 return Err(PluginError::InvalidConfig(format!(
218 "Invalid workflow ID format: {}",
219 params.workflow_id
220 )));
221 }
222
223 let job_path = parts[2];
224
225 let mut form_data = Vec::new();
226 if let Some(inputs) = params.inputs {
227 if let Some(obj) = inputs.as_object() {
228 for (k, v) in obj.iter() {
229 if v.is_null() {
230 continue;
231 }
232
233 if v.is_array() {
234 if let Some(arr) = v.as_array() {
235 for item in arr {
236 if item.is_null() {
237 continue;
238 }
239 let value_str = if item.is_boolean() {
240 item.as_bool().unwrap().to_string()
241 } else if item.is_number() {
242 item.to_string()
243 } else {
244 item.as_str()
245 .map(|s| s.to_string())
246 .unwrap_or_else(|| item.to_string())
247 };
248 form_data.push((k.clone(), value_str));
249 }
250 }
251 } else if v.is_boolean() {
252 form_data.push((k.clone(), v.as_bool().unwrap().to_string()));
253 } else if v.is_number() {
254 form_data.push((k.clone(), v.to_string()));
255 } else if let Some(s) = v.as_str() {
256 form_data.push((k.clone(), s.to_string()));
257 }
258 }
259 }
260 }
261
262 if form_data.is_empty() {
263 form_data.push(("json".to_string(), serde_json::json!({}).to_string()));
264 }
265
266 let client = self.client()?;
267 client.trigger_build(job_path, form_data).await?;
268
269 Ok(serde_json::json!({
270 "message": format!("Triggered build for job {job_path}"),
271 "job_path": job_path
272 })
273 .to_string())
274 }
275
276 async fn fetch_workflow_parameters(
277 &self, workflow_id: &str,
278 ) -> PluginResult<Vec<WorkflowParameter>> {
279 let start = std::time::Instant::now();
280 tracing::debug!(workflow_id = %workflow_id, "Fetching Jenkins workflow parameters");
281
282 let parts: Vec<&str> = workflow_id.split("__").collect();
283 if parts.len() != 3 {
284 return Err(PluginError::InvalidConfig(format!(
285 "Invalid workflow ID format: {workflow_id}"
286 )));
287 }
288
289 let job_path = parts[2];
290 let client = self.client()?;
291 let response = client.fetch_job_parameters(job_path).await?;
292
293 let param_definitions: Vec<_> = response
294 .property
295 .into_iter()
296 .filter(|prop| {
297 prop._class
298 .as_ref()
299 .map(|c| {
300 c.contains("ParametersDefinitionProperty")
301 || c.contains("ParametersProperty")
302 })
303 .unwrap_or(true)
304 })
305 .flat_map(|prop| prop.parameter_definitions)
306 .collect();
307
308 let parameters = mapper::parameter_definitions_to_workflow_parameters(param_definitions);
309
310 tracing::debug!(
311 count = parameters.len(),
312 elapsed = ?start.elapsed(),
313 "Processed Jenkins workflow parameters"
314 );
315 Ok(parameters)
316 }
317
318 async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
319 Ok(vec![Organization {
320 id: "default".to_string(),
321 name: "Jenkins Server".to_string(),
322 description: Some("All accessible Jenkins jobs".to_string()),
323 }])
324 }
325
326 async fn fetch_available_pipelines_filtered(
327 &self, _org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
328 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
329 let params = params.unwrap_or_default();
330 let client = self.client()?;
331 let all_jobs = client.discover_all_jobs().await?;
332 let mut all_pipelines = client.discovered_jobs_to_available_pipelines(all_jobs);
333
334 if let Some(search_term) = search {
335 let search_lower = search_term.to_lowercase();
336 all_pipelines.retain(|p| {
337 p.name.to_lowercase().contains(&search_lower)
338 || p.id.to_lowercase().contains(&search_lower)
339 || p.description
340 .as_ref()
341 .is_some_and(|d| d.to_lowercase().contains(&search_lower))
342 });
343 }
344
345 let total_count = all_pipelines.len();
346 let start = ((params.page - 1) * params.page_size).min(total_count);
347 let end = (start + params.page_size).min(total_count);
348 let items = all_pipelines[start..end].to_vec();
349
350 Ok(PaginatedResponse::new(
351 items,
352 params.page,
353 params.page_size,
354 total_count,
355 ))
356 }
357
358 async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
359 let parts: Vec<&str> = pipeline_id.split("__").collect();
360 if parts.len() != 3 {
361 return Err(PluginError::InvalidConfig(format!(
362 "Invalid pipeline ID format: {pipeline_id}"
363 )));
364 }
365
366 let job_path = parts[2];
367 let client = self.client()?;
368 client.cancel_build(job_path, run_number).await
369 }
370}