pipedash_plugin_buildkite/
plugin.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9 client,
10 config,
11 mapper,
12 metadata,
13};
14
15pub struct BuildkitePlugin {
16 metadata: PluginMetadata,
17 client: Option<client::BuildkiteClient>,
18 provider_id: Option<i64>,
19 config: HashMap<String, String>,
20}
21
22impl Default for BuildkitePlugin {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl BuildkitePlugin {
29 pub fn new() -> Self {
30 Self {
31 metadata: metadata::create_metadata(),
32 client: None,
33 provider_id: None,
34 config: HashMap::new(),
35 }
36 }
37
38 fn client(&self) -> PluginResult<&client::BuildkiteClient> {
39 self.client
40 .as_ref()
41 .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
42 }
43}
44
45#[async_trait]
46impl Plugin for BuildkitePlugin {
47 fn metadata(&self) -> &PluginMetadata {
48 &self.metadata
49 }
50
51 fn initialize(
52 &mut self, provider_id: i64, config: HashMap<String, String>,
53 http_client: Option<std::sync::Arc<reqwest::Client>>,
54 ) -> PluginResult<()> {
55 let token = config
56 .get("token")
57 .ok_or_else(|| PluginError::InvalidConfig("Missing Buildkite API token".to_string()))?
58 .to_string();
59
60 let client = http_client.unwrap_or_else(|| {
61 std::sync::Arc::new(
62 reqwest::Client::builder()
63 .use_rustls_tls()
64 .pool_max_idle_per_host(10)
65 .timeout(Duration::from_secs(30))
66 .connect_timeout(Duration::from_secs(10))
67 .tcp_keepalive(Duration::from_secs(60))
68 .build()
69 .expect("Failed to build HTTP client"),
70 )
71 });
72
73 self.client = Some(client::BuildkiteClient::new(client, token));
74 self.provider_id = Some(provider_id);
75 self.config = config;
76
77 Ok(())
78 }
79
80 async fn validate_credentials(&self) -> PluginResult<bool> {
81 let client = self.client()?;
82
83 client.fetch_organizations().await?;
84
85 Ok(true)
86 }
87
88 async fn fetch_available_pipelines(
89 &self, params: Option<PaginationParams>,
90 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
91 let client = self.client()?;
92 client::fetch_all_available_pipelines(client, params).await
93 }
94
95 async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
96 let client = self.client()?;
97 let orgs = client.fetch_organizations().await?;
98
99 Ok(orgs
100 .into_iter()
101 .map(|org| pipedash_plugin_api::Organization {
102 id: org.slug.clone(),
103 name: org.name,
104 description: org.description,
105 })
106 .collect())
107 }
108
109 async fn fetch_available_pipelines_filtered(
110 &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
111 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
112 let client = self.client()?;
113 client::fetch_available_pipelines_filtered(client, org, search, params).await
114 }
115
116 async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
117 let provider_id = self
118 .provider_id
119 .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
120
121 let (org, pipeline_slugs) = config::parse_selected_items(&self.config)?;
122
123 if pipeline_slugs.is_empty() {
124 return Err(PluginError::InvalidConfig(
125 "No pipelines configured".to_string(),
126 ));
127 }
128
129 let client = self.client()?;
130 let futures = pipeline_slugs
131 .into_iter()
132 .map(|slug| client.fetch_pipeline(provider_id, org.clone(), slug));
133
134 let results = join_all(futures).await;
135
136 let mut all_pipelines = Vec::new();
137 let mut errors = Vec::new();
138
139 for result in results {
140 match result {
141 Ok(pipeline) => all_pipelines.push(pipeline),
142 Err(e) => errors.push(e),
143 }
144 }
145
146 if !errors.is_empty() && all_pipelines.is_empty() {
147 return Err(errors.into_iter().next().unwrap());
148 }
149
150 Ok(all_pipelines)
151 }
152
153 async fn fetch_run_history(
154 &self, pipeline_id: &str, limit: usize,
155 ) -> PluginResult<Vec<PipelineRun>> {
156 let parts: Vec<&str> = pipeline_id.split("__").collect();
157 if parts.len() != 4 {
158 return Err(PluginError::InvalidConfig(format!(
159 "Invalid pipeline ID format: {pipeline_id}"
160 )));
161 }
162
163 let org = parts[2];
164 let slug = parts[3];
165
166 let client = self.client()?;
167 let builds = client.fetch_builds(org, slug, limit).await?;
168
169 let pipeline_runs = builds
170 .into_iter()
171 .map(|build| client::build_to_pipeline_run(build, pipeline_id))
172 .collect();
173
174 Ok(pipeline_runs)
175 }
176
177 async fn fetch_run_details(
178 &self, pipeline_id: &str, run_number: i64,
179 ) -> PluginResult<PipelineRun> {
180 let parts: Vec<&str> = pipeline_id.split("__").collect();
181 if parts.len() != 4 {
182 return Err(PluginError::InvalidConfig(format!(
183 "Invalid pipeline ID format: {pipeline_id}"
184 )));
185 }
186
187 let org = parts[2];
188 let slug = parts[3];
189
190 let client = self.client()?;
191 let builds = client.fetch_builds(org, slug, 100).await?;
192
193 let build = builds
194 .into_iter()
195 .find(|b| b.number == run_number)
196 .ok_or_else(|| {
197 PluginError::PipelineNotFound(format!(
198 "Build #{run_number} not found for pipeline {pipeline_id}"
199 ))
200 })?;
201
202 Ok(client::build_to_pipeline_run(build, pipeline_id))
203 }
204
205 async fn fetch_workflow_parameters(
206 &self, _workflow_id: &str,
207 ) -> PluginResult<Vec<WorkflowParameter>> {
208 Ok(vec![WorkflowParameter {
209 name: "branch".to_string(),
210 label: Some("Branch".to_string()),
211 description: Some("Branch to build".to_string()),
212 param_type: WorkflowParameterType::String {
213 default: Some("main".to_string()),
214 },
215 required: true,
216 }])
217 }
218
219 async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
220 let parts: Vec<&str> = params.workflow_id.split("__").collect();
221 if parts.len() != 4 {
222 return Err(PluginError::InvalidConfig(format!(
223 "Invalid workflow ID format: {}",
224 params.workflow_id
225 )));
226 }
227
228 let org = parts[2];
229 let slug = parts[3];
230
231 let branch = params
232 .inputs
233 .as_ref()
234 .and_then(|inputs| inputs.get("branch"))
235 .and_then(|v| v.as_str())
236 .unwrap_or("main")
237 .to_string();
238
239 let client = self.client()?;
240 let build = client
241 .trigger_build(org, slug, branch.clone(), params.inputs)
242 .await?;
243
244 Ok(serde_json::json!({
245 "message": format!("Triggered build #{} on branch {}", build.number, branch),
246 "build_number": build.number,
247 "build_url": build.web_url
248 })
249 .to_string())
250 }
251
252 async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
253 let (org, _) = config::parse_selected_items(&self.config)?;
254
255 let client = self.client()?;
256 let agents = client.fetch_agents(&org).await?;
257
258 Ok(agents.into_iter().map(mapper::map_agent).collect())
259 }
260
261 async fn fetch_artifacts(&self, run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
262 let (org, _) = config::parse_selected_items(&self.config)?;
263
264 let build_id = run_id
265 .strip_prefix("buildkite-build-")
266 .ok_or_else(|| PluginError::InvalidConfig(format!("Invalid run ID: {run_id}")))?;
267
268 let client = self.client()?;
269 let artifacts = client.fetch_artifacts(&org, build_id).await?;
270
271 Ok(artifacts
272 .into_iter()
273 .map(|artifact| client::artifact_to_build_artifact(artifact, run_id))
274 .collect())
275 }
276
277 async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
278 let parts: Vec<&str> = pipeline_id.split("__").collect();
279 if parts.len() != 4 {
280 return Err(PluginError::InvalidConfig(format!(
281 "Invalid pipeline ID format: {pipeline_id}"
282 )));
283 }
284
285 let org = parts[2];
286 let slug = parts[3];
287
288 let client = self.client()?;
289 client.cancel_build(org, slug, run_number).await
290 }
291}