pipedash_plugin_buildkite/
plugin.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    mapper,
12    metadata,
13};
14
15pub struct BuildkitePlugin {
16    metadata: PluginMetadata,
17    client: Option<client::BuildkiteClient>,
18    provider_id: Option<i64>,
19    config: HashMap<String, String>,
20}
21
22impl Default for BuildkitePlugin {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl BuildkitePlugin {
29    pub fn new() -> Self {
30        Self {
31            metadata: metadata::create_metadata(),
32            client: None,
33            provider_id: None,
34            config: HashMap::new(),
35        }
36    }
37
38    fn client(&self) -> PluginResult<&client::BuildkiteClient> {
39        self.client
40            .as_ref()
41            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
42    }
43}
44
45#[async_trait]
46impl Plugin for BuildkitePlugin {
47    fn metadata(&self) -> &PluginMetadata {
48        &self.metadata
49    }
50
51    fn initialize(
52        &mut self, provider_id: i64, config: HashMap<String, String>,
53        http_client: Option<std::sync::Arc<reqwest::Client>>,
54    ) -> PluginResult<()> {
55        let token = config
56            .get("token")
57            .ok_or_else(|| PluginError::InvalidConfig("Missing Buildkite API token".to_string()))?
58            .to_string();
59
60        let client = http_client.unwrap_or_else(|| {
61            std::sync::Arc::new(
62                reqwest::Client::builder()
63                    .use_rustls_tls()
64                    .pool_max_idle_per_host(10)
65                    .timeout(Duration::from_secs(30))
66                    .connect_timeout(Duration::from_secs(10))
67                    .tcp_keepalive(Duration::from_secs(60))
68                    .build()
69                    .expect("Failed to build HTTP client"),
70            )
71        });
72
73        self.client = Some(client::BuildkiteClient::new(client, token));
74        self.provider_id = Some(provider_id);
75        self.config = config;
76
77        Ok(())
78    }
79
80    async fn validate_credentials(&self) -> PluginResult<bool> {
81        let client = self.client()?;
82
83        client.fetch_organizations().await?;
84
85        Ok(true)
86    }
87
88    async fn fetch_available_pipelines(
89        &self, params: Option<PaginationParams>,
90    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
91        let client = self.client()?;
92        client::fetch_all_available_pipelines(client, params).await
93    }
94
95    async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
96        let client = self.client()?;
97        let orgs = client.fetch_organizations().await?;
98
99        Ok(orgs
100            .into_iter()
101            .map(|org| pipedash_plugin_api::Organization {
102                id: org.slug.clone(),
103                name: org.name,
104                description: org.description,
105            })
106            .collect())
107    }
108
109    async fn fetch_available_pipelines_filtered(
110        &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
111    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
112        let client = self.client()?;
113        client::fetch_available_pipelines_filtered(client, org, search, params).await
114    }
115
116    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
117        let provider_id = self
118            .provider_id
119            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
120
121        let (org, pipeline_slugs) = config::parse_selected_items(&self.config)?;
122
123        if pipeline_slugs.is_empty() {
124            return Err(PluginError::InvalidConfig(
125                "No pipelines configured".to_string(),
126            ));
127        }
128
129        let client = self.client()?;
130        let futures = pipeline_slugs
131            .into_iter()
132            .map(|slug| client.fetch_pipeline(provider_id, org.clone(), slug));
133
134        let results = join_all(futures).await;
135
136        let mut all_pipelines = Vec::new();
137        let mut errors = Vec::new();
138
139        for result in results {
140            match result {
141                Ok(pipeline) => all_pipelines.push(pipeline),
142                Err(e) => errors.push(e),
143            }
144        }
145
146        if !errors.is_empty() && all_pipelines.is_empty() {
147            return Err(errors.into_iter().next().unwrap());
148        }
149
150        Ok(all_pipelines)
151    }
152
153    async fn fetch_run_history(
154        &self, pipeline_id: &str, limit: usize,
155    ) -> PluginResult<Vec<PipelineRun>> {
156        let parts: Vec<&str> = pipeline_id.split("__").collect();
157        if parts.len() != 4 {
158            return Err(PluginError::InvalidConfig(format!(
159                "Invalid pipeline ID format: {pipeline_id}"
160            )));
161        }
162
163        let org = parts[2];
164        let slug = parts[3];
165
166        let client = self.client()?;
167        let builds = client.fetch_builds(org, slug, limit).await?;
168
169        let pipeline_runs = builds
170            .into_iter()
171            .map(|build| client::build_to_pipeline_run(build, pipeline_id))
172            .collect();
173
174        Ok(pipeline_runs)
175    }
176
177    async fn fetch_run_details(
178        &self, pipeline_id: &str, run_number: i64,
179    ) -> PluginResult<PipelineRun> {
180        let parts: Vec<&str> = pipeline_id.split("__").collect();
181        if parts.len() != 4 {
182            return Err(PluginError::InvalidConfig(format!(
183                "Invalid pipeline ID format: {pipeline_id}"
184            )));
185        }
186
187        let org = parts[2];
188        let slug = parts[3];
189
190        let client = self.client()?;
191        let builds = client.fetch_builds(org, slug, 100).await?;
192
193        let build = builds
194            .into_iter()
195            .find(|b| b.number == run_number)
196            .ok_or_else(|| {
197                PluginError::PipelineNotFound(format!(
198                    "Build #{run_number} not found for pipeline {pipeline_id}"
199                ))
200            })?;
201
202        Ok(client::build_to_pipeline_run(build, pipeline_id))
203    }
204
205    async fn fetch_workflow_parameters(
206        &self, _workflow_id: &str,
207    ) -> PluginResult<Vec<WorkflowParameter>> {
208        Ok(vec![WorkflowParameter {
209            name: "branch".to_string(),
210            label: Some("Branch".to_string()),
211            description: Some("Branch to build".to_string()),
212            param_type: WorkflowParameterType::String {
213                default: Some("main".to_string()),
214            },
215            required: true,
216        }])
217    }
218
219    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
220        let parts: Vec<&str> = params.workflow_id.split("__").collect();
221        if parts.len() != 4 {
222            return Err(PluginError::InvalidConfig(format!(
223                "Invalid workflow ID format: {}",
224                params.workflow_id
225            )));
226        }
227
228        let org = parts[2];
229        let slug = parts[3];
230
231        let branch = params
232            .inputs
233            .as_ref()
234            .and_then(|inputs| inputs.get("branch"))
235            .and_then(|v| v.as_str())
236            .unwrap_or("main")
237            .to_string();
238
239        let client = self.client()?;
240        let build = client
241            .trigger_build(org, slug, branch.clone(), params.inputs)
242            .await?;
243
244        Ok(serde_json::json!({
245            "message": format!("Triggered build #{} on branch {}", build.number, branch),
246            "build_number": build.number,
247            "build_url": build.web_url
248        })
249        .to_string())
250    }
251
252    async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
253        let (org, _) = config::parse_selected_items(&self.config)?;
254
255        let client = self.client()?;
256        let agents = client.fetch_agents(&org).await?;
257
258        Ok(agents.into_iter().map(mapper::map_agent).collect())
259    }
260
261    async fn fetch_artifacts(&self, run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
262        let (org, _) = config::parse_selected_items(&self.config)?;
263
264        let build_id = run_id
265            .strip_prefix("buildkite-build-")
266            .ok_or_else(|| PluginError::InvalidConfig(format!("Invalid run ID: {run_id}")))?;
267
268        let client = self.client()?;
269        let artifacts = client.fetch_artifacts(&org, build_id).await?;
270
271        Ok(artifacts
272            .into_iter()
273            .map(|artifact| client::artifact_to_build_artifact(artifact, run_id))
274            .collect())
275    }
276
277    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
278        let parts: Vec<&str> = pipeline_id.split("__").collect();
279        if parts.len() != 4 {
280            return Err(PluginError::InvalidConfig(format!(
281                "Invalid pipeline ID format: {pipeline_id}"
282            )));
283        }
284
285        let org = parts[2];
286        let slug = parts[3];
287
288        let client = self.client()?;
289        client.cancel_build(org, slug, run_number).await
290    }
291}