use std::sync::OnceLock;
use pipedash_plugin_api::{
PaginatedResponse,
PaginationParams,
PluginError,
PluginResult,
RetryPolicy,
};
use reqwest::StatusCode;
use crate::types::{
PaginatedResponse as BitbucketPaginatedResponse,
Pipeline,
PipelineStep,
Repository,
TriggerPipelineRequest,
User,
Workspace,
};
pub struct BitbucketClient {
http_client: std::sync::Arc<reqwest::Client>,
api_url: String,
auth_value: String,
retry_policy: RetryPolicy,
user_cache: OnceLock<User>,
}
impl BitbucketClient {
pub fn new(
http_client: std::sync::Arc<reqwest::Client>, api_url: String, auth_value: String,
) -> Self {
Self {
http_client,
api_url: api_url.trim_end_matches('/').to_string(),
auth_value,
retry_policy: RetryPolicy::default(),
user_cache: OnceLock::new(),
}
}
pub async fn get_user(&self) -> PluginResult<User> {
if let Some(user) = self.user_cache.get() {
return Ok(user.clone());
}
let user: User = self
.retry_policy
.retry(|| async {
let url = format!("{}/user", self.api_url);
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| PluginError::NetworkError(format!("Failed to get user: {}", e)))?;
self.handle_response(response).await
})
.await?;
let _ = self.user_cache.set(user.clone());
Ok(user)
}
pub async fn list_workspaces(&self) -> PluginResult<Vec<Workspace>> {
let mut all_workspaces = Vec::new();
let mut next_url = Some(format!("{}/workspaces?pagelen=100", self.api_url));
const MAX_PAGES: usize = 10;
let mut page_count = 0;
while let Some(url) = next_url.take() {
if page_count >= MAX_PAGES {
break;
}
let paginated: BitbucketPaginatedResponse<Workspace> = self
.retry_policy
.retry(|| async {
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to list workspaces: {}", e))
})?;
self.handle_response(response).await
})
.await?;
all_workspaces.extend(paginated.values);
next_url = paginated.next;
page_count += 1;
}
Ok(all_workspaces)
}
pub async fn list_repositories(
&self, workspace: &str, params: &PaginationParams,
) -> PluginResult<PaginatedResponse<Repository>> {
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories/{}?pagelen={}&page={}",
self.api_url, workspace, params.page_size, params.page
);
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to list repositories: {}", e))
})?;
let paginated: BitbucketPaginatedResponse<Repository> =
self.handle_response(response).await?;
let total_count = paginated.size.unwrap_or(paginated.values.len());
Ok(PaginatedResponse::new(
paginated.values,
params.page,
params.page_size,
total_count,
))
})
.await
}
pub async fn list_all_repositories(
&self, params: &PaginationParams,
) -> PluginResult<PaginatedResponse<Repository>> {
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories?role=member&pagelen={}&page={}",
self.api_url, params.page_size, params.page
);
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to list repositories: {}", e))
})?;
let paginated: BitbucketPaginatedResponse<Repository> =
self.handle_response(response).await?;
let total_count = paginated.size.unwrap_or(paginated.values.len());
Ok(PaginatedResponse::new(
paginated.values,
params.page,
params.page_size,
total_count,
))
})
.await
}
pub async fn list_pipelines(
&self, workspace: &str, repo_slug: &str, limit: usize,
) -> PluginResult<Vec<Pipeline>> {
let pagelen = limit.min(100);
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories/{}/{}/pipelines?pagelen={}&sort=-created_on",
self.api_url, workspace, repo_slug, pagelen
);
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to list pipelines: {}", e))
})?;
let paginated: BitbucketPaginatedResponse<Pipeline> =
self.handle_response(response).await?;
Ok(paginated.values)
})
.await
}
pub async fn list_steps(
&self, workspace: &str, repo_slug: &str, pipeline_uuid: &str,
) -> PluginResult<Vec<PipelineStep>> {
let uuid_with_braces = if pipeline_uuid.starts_with('{') {
pipeline_uuid.to_string()
} else {
format!("{{{}}}", pipeline_uuid)
};
let encoded_uuid = urlencoding::encode(&uuid_with_braces);
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories/{}/{}/pipelines/{}/steps",
self.api_url, workspace, repo_slug, encoded_uuid
);
let response = self
.http_client
.get(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to list pipeline steps: {}", e))
})?;
let paginated: BitbucketPaginatedResponse<PipelineStep> =
self.handle_response(response).await?;
Ok(paginated.values)
})
.await
}
pub async fn trigger_pipeline(
&self, workspace: &str, repo_slug: &str, request: TriggerPipelineRequest,
) -> PluginResult<Pipeline> {
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories/{}/{}/pipelines/",
self.api_url, workspace, repo_slug
);
let response = self
.http_client
.post(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.json(&request)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to trigger pipeline: {}", e))
})?;
self.handle_response(response).await
})
.await
}
pub async fn stop_pipeline(
&self, workspace: &str, repo_slug: &str, pipeline_uuid: &str,
) -> PluginResult<()> {
let uuid_with_braces = if pipeline_uuid.starts_with('{') {
pipeline_uuid.to_string()
} else {
format!("{{{}}}", pipeline_uuid)
};
let encoded_uuid = urlencoding::encode(&uuid_with_braces);
self.retry_policy
.retry(|| async {
let url = format!(
"{}/repositories/{}/{}/pipelines/{}/stopPipeline",
self.api_url, workspace, repo_slug, encoded_uuid
);
let response = self
.http_client
.post(&url)
.header(reqwest::header::AUTHORIZATION, &self.auth_value)
.send()
.await
.map_err(|e| {
PluginError::NetworkError(format!("Failed to stop pipeline: {}", e))
})?;
let status = response.status();
if status.is_success() {
Ok(())
} else {
let error_text = response.text().await.unwrap_or_default();
Err(PluginError::ApiError(format!(
"Failed to stop pipeline ({}): {}",
status, error_text
)))
}
})
.await
}
async fn handle_response<T: serde::de::DeserializeOwned>(
&self, response: reqwest::Response,
) -> PluginResult<T> {
let status = response.status();
let url = response.url().clone();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(PluginError::AuthenticationFailed(format!(
"Authentication failed for {}: {}",
url, error_text
)));
}
if status == StatusCode::NOT_FOUND {
return Err(PluginError::PipelineNotFound(format!(
"Resource not found: {}",
url
)));
}
if !status.is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(PluginError::ApiError(format!(
"Bitbucket API error ({}) for {}: {}",
status, url, error_text
)));
}
response.json::<T>().await.map_err(|e| {
PluginError::ApiError(format!(
"Failed to parse Bitbucket API response from {}: {}",
url, e
))
})
}
}