pipedash_plugin_bitbucket/
plugin.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9 client,
10 config,
11 mapper,
12 metadata,
13 types,
14};
15
16pub struct BitbucketPlugin {
17 metadata: PluginMetadata,
18 client: Option<client::BitbucketClient>,
19 provider_id: Option<i64>,
20 config: HashMap<String, String>,
21}
22
23impl Default for BitbucketPlugin {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl BitbucketPlugin {
30 pub fn new() -> Self {
31 Self {
32 metadata: metadata::create_metadata(),
33 client: None,
34 provider_id: None,
35 config: HashMap::new(),
36 }
37 }
38
39 fn client(&self) -> PluginResult<&client::BitbucketClient> {
40 self.client
41 .as_ref()
42 .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
43 }
44
45 async fn fetch_all_repositories(&self) -> PluginResult<Vec<types::Repository>> {
46 let client = self.client()?;
47 let mut all_repos = Vec::new();
48 let mut page = 1;
49
50 const MAX_PAGES: usize = 50;
51
52 loop {
53 let params = PaginationParams {
54 page,
55 page_size: 100,
56 };
57 let response = client.list_all_repositories(¶ms).await?;
58
59 if response.items.is_empty() {
60 break;
61 }
62
63 all_repos.extend(response.items);
64
65 if !response.has_more || page >= MAX_PAGES {
66 break;
67 }
68 page += 1;
69 }
70
71 if let Some(selected) = config::parse_selected_items(&self.config) {
72 Ok(all_repos
73 .into_iter()
74 .filter(|r| selected.contains(&r.full_name))
75 .collect())
76 } else {
77 Ok(all_repos)
78 }
79 }
80
81 async fn find_pipeline_by_build_number(
82 &self, workspace: &str, repo_slug: &str, build_number: i64,
83 ) -> PluginResult<types::Pipeline> {
84 let client = self.client()?;
85 let pipelines = client.list_pipelines(workspace, repo_slug, 100).await?;
86
87 pipelines
88 .into_iter()
89 .find(|p| p.build_number == build_number)
90 .ok_or_else(|| {
91 PluginError::PipelineNotFound(format!(
92 "Pipeline run #{} not found in recent 100 runs for {}/{}",
93 build_number, workspace, repo_slug
94 ))
95 })
96 }
97}
98
99#[async_trait]
100impl Plugin for BitbucketPlugin {
101 fn metadata(&self) -> &PluginMetadata {
102 &self.metadata
103 }
104
105 fn provider_type(&self) -> &str {
106 "bitbucket"
107 }
108
109 fn initialize(
110 &mut self, provider_id: i64, config: HashMap<String, String>,
111 http_client: Option<std::sync::Arc<reqwest::Client>>,
112 ) -> PluginResult<()> {
113 let (email, api_token) = config::get_auth(&config)?;
114 let api_url = config::get_api_url();
115
116 let credentials = format!("{}:{}", email, api_token);
117 let encoded = base64::Engine::encode(
118 &base64::engine::general_purpose::STANDARD,
119 credentials.as_bytes(),
120 );
121 let auth_value = format!("Basic {}", encoded);
122
123 let client = http_client.unwrap_or_else(|| {
124 std::sync::Arc::new(
125 reqwest::Client::builder()
126 .use_rustls_tls()
127 .pool_max_idle_per_host(10)
128 .timeout(Duration::from_secs(30))
129 .connect_timeout(Duration::from_secs(10))
130 .tcp_keepalive(Duration::from_secs(60))
131 .build()
132 .expect("Failed to build HTTP client"),
133 )
134 });
135
136 self.client = Some(client::BitbucketClient::new(client, api_url, auth_value));
137 self.provider_id = Some(provider_id);
138 self.config = config;
139
140 Ok(())
141 }
142
143 async fn validate_credentials(&self) -> PluginResult<bool> {
144 let client = self.client()?;
145 client.get_user().await?;
146 Ok(true)
147 }
148
149 async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
150 let client = self.client()?;
151 let workspaces = client.list_workspaces().await?;
152
153 Ok(workspaces
154 .into_iter()
155 .map(|w| Organization {
156 id: w.uuid,
157 name: w.name,
158 description: Some(w.slug),
159 })
160 .collect())
161 }
162
163 async fn fetch_available_pipelines(
164 &self, params: Option<PaginationParams>,
165 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
166 let client = self.client()?;
167 let params = params.unwrap_or_default();
168
169 let response = client.list_all_repositories(¶ms).await?;
170
171 let available = response
172 .items
173 .iter()
174 .map(mapper::map_available_pipeline)
175 .collect();
176
177 Ok(PaginatedResponse::new(
178 available,
179 response.page,
180 response.page_size,
181 response.total_count,
182 ))
183 }
184
185 async fn fetch_available_pipelines_filtered(
186 &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
187 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
188 let client = self.client()?;
189 let params = params.unwrap_or_default();
190
191 let response = if let Some(workspace) = &org {
192 client.list_repositories(workspace, ¶ms).await?
193 } else {
194 client.list_all_repositories(¶ms).await?
195 };
196
197 let mut available: Vec<AvailablePipeline> = response
198 .items
199 .iter()
200 .map(mapper::map_available_pipeline)
201 .collect();
202
203 if let Some(search_term) = search {
204 let search_lower = search_term.to_lowercase();
205 available.retain(|p| {
206 p.name.to_lowercase().contains(&search_lower)
207 || p.id.to_lowercase().contains(&search_lower)
208 });
209 }
210
211 Ok(PaginatedResponse::new(
212 available,
213 params.page,
214 params.page_size,
215 response.total_count,
216 ))
217 }
218
219 async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
220 let provider_id = self
221 .provider_id
222 .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
223
224 let client = self.client()?;
225 let repos = self.fetch_all_repositories().await?;
226
227 let pipeline_futures = repos.iter().map(|repo| async move {
228 let pipelines = client
229 .list_pipelines(&repo.workspace.slug, &repo.slug, 1)
230 .await
231 .ok()?;
232 let latest = pipelines.first();
233 Some(mapper::map_pipeline(repo, latest, provider_id))
234 });
235
236 let results: Vec<Option<Pipeline>> = join_all(pipeline_futures).await;
237 Ok(results.into_iter().flatten().collect())
238 }
239
240 async fn fetch_run_history(
241 &self, pipeline_id: &str, limit: usize,
242 ) -> PluginResult<Vec<PipelineRun>> {
243 let (provider_id, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
244 let client = self.client()?;
245
246 let pipelines = client.list_pipelines(&workspace, &repo_slug, limit).await?;
247
248 Ok(pipelines
249 .iter()
250 .map(|p| mapper::map_pipeline_run(p, &workspace, &repo_slug, provider_id))
251 .collect())
252 }
253
254 async fn fetch_run_details(
255 &self, pipeline_id: &str, run_number: i64,
256 ) -> PluginResult<PipelineRun> {
257 let (provider_id, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
258
259 let pipeline = self
260 .find_pipeline_by_build_number(&workspace, &repo_slug, run_number)
261 .await?;
262
263 Ok(mapper::map_pipeline_run(
264 &pipeline,
265 &workspace,
266 &repo_slug,
267 provider_id,
268 ))
269 }
270
271 async fn fetch_workflow_parameters(
272 &self, _workflow_id: &str,
273 ) -> PluginResult<Vec<WorkflowParameter>> {
274 Ok(vec![
275 WorkflowParameter {
276 name: "ref".to_string(),
277 label: Some("Branch".to_string()),
278 description: Some("Branch name to run pipeline on".to_string()),
279 param_type: WorkflowParameterType::String {
280 default: Some("main".to_string()),
281 },
282 required: true,
283 },
284 WorkflowParameter {
285 name: "custom_pipeline".to_string(),
286 label: Some("Custom Pipeline".to_string()),
287 description: Some("Name of custom pipeline to run (optional)".to_string()),
288 param_type: WorkflowParameterType::String { default: None },
289 required: false,
290 },
291 ])
292 }
293
294 async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
295 let (_, workspace, repo_slug) = config::parse_pipeline_id(¶ms.workflow_id)?;
296 let client = self.client()?;
297
298 let ref_name = params
299 .inputs
300 .as_ref()
301 .and_then(|i| i.get("ref"))
302 .and_then(|v| v.as_str())
303 .unwrap_or("main")
304 .to_string();
305
306 let custom_pipeline = params
307 .inputs
308 .as_ref()
309 .and_then(|i| i.get("custom_pipeline"))
310 .and_then(|v| v.as_str())
311 .filter(|s| !s.is_empty());
312
313 let request = types::TriggerPipelineRequest {
314 target: types::TriggerTarget {
315 target_type: "pipeline_ref_target".to_string(),
316 ref_name,
317 ref_type: "branch".to_string(),
318 selector: custom_pipeline.map(|name| types::TriggerSelector {
319 selector_type: "custom".to_string(),
320 pattern: Some(name.to_string()),
321 }),
322 },
323 };
324
325 let pipeline = client
326 .trigger_pipeline(&workspace, &repo_slug, request)
327 .await?;
328
329 let url = pipeline.links.html.map(|l| l.href).unwrap_or_else(|| {
330 format!(
331 "https://bitbucket.org/{}/{}/pipelines/results/{}",
332 workspace, repo_slug, pipeline.build_number
333 )
334 });
335
336 Ok(url)
337 }
338
339 async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
340 let (_, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
341 let client = self.client()?;
342
343 let pipeline = self
344 .find_pipeline_by_build_number(&workspace, &repo_slug, run_number)
345 .await?;
346
347 let state = pipeline.state.name.as_str();
348 if state == "PAUSED" || state == "HALTED" {
349 return Err(PluginError::ApiError(format!(
350 "Cannot cancel pipeline in '{}' state. Pipelines waiting for manual approval \
351 must be cancelled directly in Bitbucket UI.",
352 state
353 )));
354 }
355
356 if state == "COMPLETED" {
357 return Err(PluginError::ApiError(
358 "Cannot cancel a completed pipeline.".to_string(),
359 ));
360 }
361
362 if state == "IN_PROGRESS" {
363 let pipeline_is_paused = pipeline
364 .state
365 .stage
366 .as_ref()
367 .map(|s| s.name == "PAUSED")
368 .unwrap_or(false);
369
370 if pipeline_is_paused {
371 return Err(PluginError::ApiError(
372 "Cannot cancel pipeline waiting for manual approval. \
373 Please cancel directly in Bitbucket UI."
374 .to_string(),
375 ));
376 }
377
378 let steps = client
379 .list_steps(&workspace, &repo_slug, &pipeline.uuid)
380 .await?;
381
382 let has_paused_step = steps.iter().any(|step| {
383 step.state.name == "PENDING"
384 && step
385 .state
386 .stage
387 .as_ref()
388 .map(|s| s.name == "PAUSED")
389 .unwrap_or(false)
390 });
391
392 if has_paused_step {
393 return Err(PluginError::ApiError(
394 "Cannot cancel pipeline with steps waiting for manual approval. \
395 Please cancel directly in Bitbucket UI."
396 .to_string(),
397 ));
398 }
399 }
400
401 client
402 .stop_pipeline(&workspace, &repo_slug, &pipeline.uuid)
403 .await
404 }
405
406 async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
407 Err(PluginError::NotSupported(
408 "Bitbucket Pipelines uses Atlassian infrastructure - no agent monitoring".to_string(),
409 ))
410 }
411
412 async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
413 Err(PluginError::NotSupported(
414 "Artifact fetching not yet implemented for Bitbucket".to_string(),
415 ))
416 }
417
418 async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
419 Err(PluginError::NotSupported(
420 "Build queues not supported by Bitbucket Pipelines".to_string(),
421 ))
422 }
423
424 fn get_migrations(&self) -> Vec<String> {
425 vec![]
426 }
427}