use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
use futures::future::join_all;
use pipedash_plugin_api::*;
use crate::{
client,
config,
mapper,
metadata,
};
pub struct BuildkitePlugin {
metadata: PluginMetadata,
client: Option<client::BuildkiteClient>,
provider_id: Option<i64>,
config: HashMap<String, String>,
}
impl Default for BuildkitePlugin {
fn default() -> Self {
Self::new()
}
}
impl BuildkitePlugin {
pub fn new() -> Self {
Self {
metadata: metadata::create_metadata(),
client: None,
provider_id: None,
config: HashMap::new(),
}
}
fn client(&self) -> PluginResult<&client::BuildkiteClient> {
self.client
.as_ref()
.ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
}
}
#[async_trait]
impl Plugin for BuildkitePlugin {
fn metadata(&self) -> &PluginMetadata {
&self.metadata
}
fn initialize(
&mut self, provider_id: i64, config: HashMap<String, String>,
http_client: Option<std::sync::Arc<reqwest::Client>>,
) -> PluginResult<()> {
let token = config
.get("token")
.ok_or_else(|| PluginError::InvalidConfig("Missing Buildkite API token".to_string()))?
.to_string();
let client = http_client.unwrap_or_else(|| {
std::sync::Arc::new(
reqwest::Client::builder()
.use_rustls_tls()
.pool_max_idle_per_host(10)
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.tcp_keepalive(Duration::from_secs(60))
.build()
.expect("Failed to build HTTP client"),
)
});
self.client = Some(client::BuildkiteClient::new(client, token));
self.provider_id = Some(provider_id);
self.config = config;
Ok(())
}
async fn validate_credentials(&self) -> PluginResult<bool> {
let client = self.client()?;
client.fetch_organizations().await?;
Ok(true)
}
async fn fetch_available_pipelines(
&self, params: Option<PaginationParams>,
) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
let client = self.client()?;
client::fetch_all_available_pipelines(client, params).await
}
async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
let client = self.client()?;
let orgs = client.fetch_organizations().await?;
Ok(orgs
.into_iter()
.map(|org| pipedash_plugin_api::Organization {
id: org.slug.clone(),
name: org.name,
description: org.description,
})
.collect())
}
async fn fetch_available_pipelines_filtered(
&self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
let client = self.client()?;
client::fetch_available_pipelines_filtered(client, org, search, params).await
}
async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
let provider_id = self
.provider_id
.ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
let (org, pipeline_slugs) = config::parse_selected_items(&self.config)?;
if pipeline_slugs.is_empty() {
return Err(PluginError::InvalidConfig(
"No pipelines configured".to_string(),
));
}
let client = self.client()?;
let futures = pipeline_slugs
.into_iter()
.map(|slug| client.fetch_pipeline(provider_id, org.clone(), slug));
let results = join_all(futures).await;
let mut all_pipelines = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
Ok(pipeline) => all_pipelines.push(pipeline),
Err(e) => errors.push(e),
}
}
if !errors.is_empty() && all_pipelines.is_empty() {
return Err(errors.into_iter().next().unwrap());
}
Ok(all_pipelines)
}
async fn fetch_run_history(
&self, pipeline_id: &str, limit: usize,
) -> PluginResult<Vec<PipelineRun>> {
let parts: Vec<&str> = pipeline_id.split("__").collect();
if parts.len() != 4 {
return Err(PluginError::InvalidConfig(format!(
"Invalid pipeline ID format: {pipeline_id}"
)));
}
let org = parts[2];
let slug = parts[3];
let client = self.client()?;
let builds = client.fetch_builds(org, slug, limit).await?;
let pipeline_runs = builds
.into_iter()
.map(|build| client::build_to_pipeline_run(build, pipeline_id))
.collect();
Ok(pipeline_runs)
}
async fn fetch_run_details(
&self, pipeline_id: &str, run_number: i64,
) -> PluginResult<PipelineRun> {
let parts: Vec<&str> = pipeline_id.split("__").collect();
if parts.len() != 4 {
return Err(PluginError::InvalidConfig(format!(
"Invalid pipeline ID format: {pipeline_id}"
)));
}
let org = parts[2];
let slug = parts[3];
let client = self.client()?;
let builds = client.fetch_builds(org, slug, 100).await?;
let build = builds
.into_iter()
.find(|b| b.number == run_number)
.ok_or_else(|| {
PluginError::PipelineNotFound(format!(
"Build #{run_number} not found for pipeline {pipeline_id}"
))
})?;
Ok(client::build_to_pipeline_run(build, pipeline_id))
}
async fn fetch_workflow_parameters(
&self, _workflow_id: &str,
) -> PluginResult<Vec<WorkflowParameter>> {
Ok(vec![WorkflowParameter {
name: "branch".to_string(),
label: Some("Branch".to_string()),
description: Some("Branch to build".to_string()),
param_type: WorkflowParameterType::String {
default: Some("main".to_string()),
},
required: true,
}])
}
async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
let parts: Vec<&str> = params.workflow_id.split("__").collect();
if parts.len() != 4 {
return Err(PluginError::InvalidConfig(format!(
"Invalid workflow ID format: {}",
params.workflow_id
)));
}
let org = parts[2];
let slug = parts[3];
let branch = params
.inputs
.as_ref()
.and_then(|inputs| inputs.get("branch"))
.and_then(|v| v.as_str())
.unwrap_or("main")
.to_string();
let client = self.client()?;
let build = client
.trigger_build(org, slug, branch.clone(), params.inputs)
.await?;
Ok(serde_json::json!({
"message": format!("Triggered build #{} on branch {}", build.number, branch),
"build_number": build.number,
"build_url": build.web_url
})
.to_string())
}
async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
let (org, _) = config::parse_selected_items(&self.config)?;
let client = self.client()?;
let agents = client.fetch_agents(&org).await?;
Ok(agents.into_iter().map(mapper::map_agent).collect())
}
async fn fetch_artifacts(&self, run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
let (org, _) = config::parse_selected_items(&self.config)?;
let build_id = run_id
.strip_prefix("buildkite-build-")
.ok_or_else(|| PluginError::InvalidConfig(format!("Invalid run ID: {run_id}")))?;
let client = self.client()?;
let artifacts = client.fetch_artifacts(&org, build_id).await?;
Ok(artifacts
.into_iter()
.map(|artifact| client::artifact_to_build_artifact(artifact, run_id))
.collect())
}
async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
let parts: Vec<&str> = pipeline_id.split("__").collect();
if parts.len() != 4 {
return Err(PluginError::InvalidConfig(format!(
"Invalid pipeline ID format: {pipeline_id}"
)));
}
let org = parts[2];
let slug = parts[3];
let client = self.client()?;
client.cancel_build(org, slug, run_number).await
}
}