pipedash_plugin_bitbucket/
plugin.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    mapper,
12    metadata,
13    types,
14};
15
16pub struct BitbucketPlugin {
17    metadata: PluginMetadata,
18    client: Option<client::BitbucketClient>,
19    provider_id: Option<i64>,
20    config: HashMap<String, String>,
21}
22
23impl Default for BitbucketPlugin {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl BitbucketPlugin {
30    pub fn new() -> Self {
31        Self {
32            metadata: metadata::create_metadata(),
33            client: None,
34            provider_id: None,
35            config: HashMap::new(),
36        }
37    }
38
39    fn client(&self) -> PluginResult<&client::BitbucketClient> {
40        self.client
41            .as_ref()
42            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
43    }
44
45    async fn fetch_all_repositories(&self) -> PluginResult<Vec<types::Repository>> {
46        let client = self.client()?;
47        let mut all_repos = Vec::new();
48        let mut page = 1;
49
50        const MAX_PAGES: usize = 50;
51
52        loop {
53            let params = PaginationParams {
54                page,
55                page_size: 100,
56            };
57            let response = client.list_all_repositories(&params).await?;
58
59            if response.items.is_empty() {
60                break;
61            }
62
63            all_repos.extend(response.items);
64
65            if !response.has_more || page >= MAX_PAGES {
66                break;
67            }
68            page += 1;
69        }
70
71        if let Some(selected) = config::parse_selected_items(&self.config) {
72            Ok(all_repos
73                .into_iter()
74                .filter(|r| selected.contains(&r.full_name))
75                .collect())
76        } else {
77            Ok(all_repos)
78        }
79    }
80
81    async fn find_pipeline_by_build_number(
82        &self, workspace: &str, repo_slug: &str, build_number: i64,
83    ) -> PluginResult<types::Pipeline> {
84        let client = self.client()?;
85        let pipelines = client.list_pipelines(workspace, repo_slug, 100).await?;
86
87        pipelines
88            .into_iter()
89            .find(|p| p.build_number == build_number)
90            .ok_or_else(|| {
91                PluginError::PipelineNotFound(format!(
92                    "Pipeline run #{} not found in recent 100 runs for {}/{}",
93                    build_number, workspace, repo_slug
94                ))
95            })
96    }
97}
98
99#[async_trait]
100impl Plugin for BitbucketPlugin {
101    fn metadata(&self) -> &PluginMetadata {
102        &self.metadata
103    }
104
105    fn provider_type(&self) -> &str {
106        "bitbucket"
107    }
108
109    fn initialize(
110        &mut self, provider_id: i64, config: HashMap<String, String>,
111        http_client: Option<std::sync::Arc<reqwest::Client>>,
112    ) -> PluginResult<()> {
113        let (email, api_token) = config::get_auth(&config)?;
114        let api_url = config::get_api_url();
115
116        let credentials = format!("{}:{}", email, api_token);
117        let encoded = base64::Engine::encode(
118            &base64::engine::general_purpose::STANDARD,
119            credentials.as_bytes(),
120        );
121        let auth_value = format!("Basic {}", encoded);
122
123        let client = http_client.unwrap_or_else(|| {
124            std::sync::Arc::new(
125                reqwest::Client::builder()
126                    .use_rustls_tls()
127                    .pool_max_idle_per_host(10)
128                    .timeout(Duration::from_secs(30))
129                    .connect_timeout(Duration::from_secs(10))
130                    .tcp_keepalive(Duration::from_secs(60))
131                    .build()
132                    .expect("Failed to build HTTP client"),
133            )
134        });
135
136        self.client = Some(client::BitbucketClient::new(client, api_url, auth_value));
137        self.provider_id = Some(provider_id);
138        self.config = config;
139
140        Ok(())
141    }
142
143    async fn validate_credentials(&self) -> PluginResult<bool> {
144        let client = self.client()?;
145        client.get_user().await?;
146        Ok(true)
147    }
148
149    async fn fetch_organizations(&self) -> PluginResult<Vec<Organization>> {
150        let client = self.client()?;
151        let workspaces = client.list_workspaces().await?;
152
153        Ok(workspaces
154            .into_iter()
155            .map(|w| Organization {
156                id: w.uuid,
157                name: w.name,
158                description: Some(w.slug),
159            })
160            .collect())
161    }
162
163    async fn fetch_available_pipelines(
164        &self, params: Option<PaginationParams>,
165    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
166        let client = self.client()?;
167        let params = params.unwrap_or_default();
168
169        let response = client.list_all_repositories(&params).await?;
170
171        let available = response
172            .items
173            .iter()
174            .map(mapper::map_available_pipeline)
175            .collect();
176
177        Ok(PaginatedResponse::new(
178            available,
179            response.page,
180            response.page_size,
181            response.total_count,
182        ))
183    }
184
185    async fn fetch_available_pipelines_filtered(
186        &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
187    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
188        let client = self.client()?;
189        let params = params.unwrap_or_default();
190
191        let response = if let Some(workspace) = &org {
192            client.list_repositories(workspace, &params).await?
193        } else {
194            client.list_all_repositories(&params).await?
195        };
196
197        let mut available: Vec<AvailablePipeline> = response
198            .items
199            .iter()
200            .map(mapper::map_available_pipeline)
201            .collect();
202
203        if let Some(search_term) = search {
204            let search_lower = search_term.to_lowercase();
205            available.retain(|p| {
206                p.name.to_lowercase().contains(&search_lower)
207                    || p.id.to_lowercase().contains(&search_lower)
208            });
209        }
210
211        Ok(PaginatedResponse::new(
212            available,
213            params.page,
214            params.page_size,
215            response.total_count,
216        ))
217    }
218
219    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
220        let provider_id = self
221            .provider_id
222            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
223
224        let client = self.client()?;
225        let repos = self.fetch_all_repositories().await?;
226
227        let pipeline_futures = repos.iter().map(|repo| async move {
228            let pipelines = client
229                .list_pipelines(&repo.workspace.slug, &repo.slug, 1)
230                .await
231                .ok()?;
232            let latest = pipelines.first();
233            Some(mapper::map_pipeline(repo, latest, provider_id))
234        });
235
236        let results: Vec<Option<Pipeline>> = join_all(pipeline_futures).await;
237        Ok(results.into_iter().flatten().collect())
238    }
239
240    async fn fetch_run_history(
241        &self, pipeline_id: &str, limit: usize,
242    ) -> PluginResult<Vec<PipelineRun>> {
243        let (provider_id, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
244        let client = self.client()?;
245
246        let pipelines = client.list_pipelines(&workspace, &repo_slug, limit).await?;
247
248        Ok(pipelines
249            .iter()
250            .map(|p| mapper::map_pipeline_run(p, &workspace, &repo_slug, provider_id))
251            .collect())
252    }
253
254    async fn fetch_run_details(
255        &self, pipeline_id: &str, run_number: i64,
256    ) -> PluginResult<PipelineRun> {
257        let (provider_id, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
258
259        let pipeline = self
260            .find_pipeline_by_build_number(&workspace, &repo_slug, run_number)
261            .await?;
262
263        Ok(mapper::map_pipeline_run(
264            &pipeline,
265            &workspace,
266            &repo_slug,
267            provider_id,
268        ))
269    }
270
271    async fn fetch_workflow_parameters(
272        &self, _workflow_id: &str,
273    ) -> PluginResult<Vec<WorkflowParameter>> {
274        Ok(vec![
275            WorkflowParameter {
276                name: "ref".to_string(),
277                label: Some("Branch".to_string()),
278                description: Some("Branch name to run pipeline on".to_string()),
279                param_type: WorkflowParameterType::String {
280                    default: Some("main".to_string()),
281                },
282                required: true,
283            },
284            WorkflowParameter {
285                name: "custom_pipeline".to_string(),
286                label: Some("Custom Pipeline".to_string()),
287                description: Some("Name of custom pipeline to run (optional)".to_string()),
288                param_type: WorkflowParameterType::String { default: None },
289                required: false,
290            },
291        ])
292    }
293
294    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
295        let (_, workspace, repo_slug) = config::parse_pipeline_id(&params.workflow_id)?;
296        let client = self.client()?;
297
298        let ref_name = params
299            .inputs
300            .as_ref()
301            .and_then(|i| i.get("ref"))
302            .and_then(|v| v.as_str())
303            .unwrap_or("main")
304            .to_string();
305
306        let custom_pipeline = params
307            .inputs
308            .as_ref()
309            .and_then(|i| i.get("custom_pipeline"))
310            .and_then(|v| v.as_str())
311            .filter(|s| !s.is_empty());
312
313        let request = types::TriggerPipelineRequest {
314            target: types::TriggerTarget {
315                target_type: "pipeline_ref_target".to_string(),
316                ref_name,
317                ref_type: "branch".to_string(),
318                selector: custom_pipeline.map(|name| types::TriggerSelector {
319                    selector_type: "custom".to_string(),
320                    pattern: Some(name.to_string()),
321                }),
322            },
323        };
324
325        let pipeline = client
326            .trigger_pipeline(&workspace, &repo_slug, request)
327            .await?;
328
329        let url = pipeline.links.html.map(|l| l.href).unwrap_or_else(|| {
330            format!(
331                "https://bitbucket.org/{}/{}/pipelines/results/{}",
332                workspace, repo_slug, pipeline.build_number
333            )
334        });
335
336        Ok(url)
337    }
338
339    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
340        let (_, workspace, repo_slug) = config::parse_pipeline_id(pipeline_id)?;
341        let client = self.client()?;
342
343        let pipeline = self
344            .find_pipeline_by_build_number(&workspace, &repo_slug, run_number)
345            .await?;
346
347        let state = pipeline.state.name.as_str();
348        if state == "PAUSED" || state == "HALTED" {
349            return Err(PluginError::ApiError(format!(
350                "Cannot cancel pipeline in '{}' state. Pipelines waiting for manual approval \
351                 must be cancelled directly in Bitbucket UI.",
352                state
353            )));
354        }
355
356        if state == "COMPLETED" {
357            return Err(PluginError::ApiError(
358                "Cannot cancel a completed pipeline.".to_string(),
359            ));
360        }
361
362        if state == "IN_PROGRESS" {
363            let pipeline_is_paused = pipeline
364                .state
365                .stage
366                .as_ref()
367                .map(|s| s.name == "PAUSED")
368                .unwrap_or(false);
369
370            if pipeline_is_paused {
371                return Err(PluginError::ApiError(
372                    "Cannot cancel pipeline waiting for manual approval. \
373                     Please cancel directly in Bitbucket UI."
374                        .to_string(),
375                ));
376            }
377
378            let steps = client
379                .list_steps(&workspace, &repo_slug, &pipeline.uuid)
380                .await?;
381
382            let has_paused_step = steps.iter().any(|step| {
383                step.state.name == "PENDING"
384                    && step
385                        .state
386                        .stage
387                        .as_ref()
388                        .map(|s| s.name == "PAUSED")
389                        .unwrap_or(false)
390            });
391
392            if has_paused_step {
393                return Err(PluginError::ApiError(
394                    "Cannot cancel pipeline with steps waiting for manual approval. \
395                     Please cancel directly in Bitbucket UI."
396                        .to_string(),
397                ));
398            }
399        }
400
401        client
402            .stop_pipeline(&workspace, &repo_slug, &pipeline.uuid)
403            .await
404    }
405
406    async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
407        Err(PluginError::NotSupported(
408            "Bitbucket Pipelines uses Atlassian infrastructure - no agent monitoring".to_string(),
409        ))
410    }
411
412    async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
413        Err(PluginError::NotSupported(
414            "Artifact fetching not yet implemented for Bitbucket".to_string(),
415        ))
416    }
417
418    async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
419        Err(PluginError::NotSupported(
420            "Build queues not supported by Bitbucket Pipelines".to_string(),
421        ))
422    }
423
424    fn get_migrations(&self) -> Vec<String> {
425        vec![]
426    }
427}