use std::time::Duration;
use progenitor_client::Error as RawError;
use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
use crate::Client;
use crate::types::{
CreateDeploymentObjectRequest, DeploymentObject, ErrorResponse, K8sEventHistoryResponse,
NewStack, PodLogHistoryResponse, Stack, WsConnectionsResponse,
};
use chrono::{DateTime, Utc};
use std::path::Path;
use uuid::Uuid;
#[derive(Debug)]
pub enum BrokkrError {
Api(ErrorResponse, reqwest::StatusCode),
Transport(reqwest::Error),
UnexpectedResponse {
status: Option<reqwest::StatusCode>,
detail: String,
},
InvalidRequest(String),
}
impl BrokkrError {
pub fn status(&self) -> Option<reqwest::StatusCode> {
match self {
Self::Api(_, status) => Some(*status),
Self::Transport(e) => e.status(),
Self::UnexpectedResponse { status, .. } => *status,
Self::InvalidRequest(_) => None,
}
}
pub fn code(&self) -> Option<&str> {
match self {
Self::Api(body, _) => Some(&body.code),
_ => None,
}
}
pub fn is_retryable(&self) -> bool {
match self {
Self::Transport(_) => true,
Self::Api(_, status) => is_retryable_status(*status),
Self::UnexpectedResponse {
status: Some(status),
..
} => is_retryable_status(*status),
_ => false,
}
}
}
impl std::fmt::Display for BrokkrError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Api(body, status) => {
write!(f, "{} {}: {}", status.as_u16(), body.code, body.message)
}
Self::Transport(e) => write!(f, "transport error: {e}"),
Self::UnexpectedResponse { status, detail } => match status {
Some(s) => write!(f, "unexpected response ({}): {}", s.as_u16(), detail),
None => write!(f, "unexpected response: {detail}"),
},
Self::InvalidRequest(msg) => write!(f, "invalid request: {msg}"),
}
}
}
impl std::error::Error for BrokkrError {}
impl From<RawError<ErrorResponse>> for BrokkrError {
fn from(err: RawError<ErrorResponse>) -> Self {
match err {
RawError::ErrorResponse(rv) => {
let status = rv.status();
Self::Api(rv.into_inner(), status)
}
RawError::CommunicationError(e)
| RawError::InvalidUpgrade(e)
| RawError::ResponseBodyError(e) => Self::Transport(e),
RawError::InvalidRequest(msg) => Self::InvalidRequest(msg),
RawError::InvalidResponsePayload(bytes, e) => Self::UnexpectedResponse {
status: None,
detail: format!(
"payload deserialization failed: {e} ({} bytes)",
bytes.len()
),
},
RawError::UnexpectedResponse(resp) => Self::UnexpectedResponse {
status: Some(resp.status()),
detail: "response not described in OpenAPI spec".to_string(),
},
RawError::Custom(s) => Self::InvalidRequest(s),
}
}
}
fn is_retryable_status(status: reqwest::StatusCode) -> bool {
matches!(status.as_u16(), 408 | 429 | 502 | 503 | 504)
}
#[derive(Debug)]
pub struct BrokkrClientBuilder {
base_url: String,
token: Option<String>,
request_timeout: Duration,
connect_timeout: Duration,
max_retries: u32,
initial_backoff: Duration,
}
impl BrokkrClientBuilder {
fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
token: None,
request_timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(10),
max_retries: 3,
initial_backoff: Duration::from_millis(200),
}
}
pub fn token(mut self, token: impl Into<String>) -> Self {
self.token = Some(token.into());
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
pub fn max_retries(mut self, max: u32) -> Self {
self.max_retries = max;
self
}
pub fn initial_backoff(mut self, initial: Duration) -> Self {
self.initial_backoff = initial;
self
}
pub fn build(self) -> Result<BrokkrClient, BrokkrError> {
let mut headers = HeaderMap::new();
if let Some(token) = &self.token {
let value = HeaderValue::from_str(token).map_err(|e| {
BrokkrError::InvalidRequest(format!("invalid token header value: {e}"))
})?;
headers.insert(AUTHORIZATION, value);
}
let reqwest_client = reqwest::Client::builder()
.default_headers(headers)
.connect_timeout(self.connect_timeout)
.timeout(self.request_timeout)
.build()
.map_err(BrokkrError::Transport)?;
let inner = Client::new_with_client(&self.base_url, reqwest_client);
Ok(BrokkrClient {
inner,
max_retries: self.max_retries,
initial_backoff: self.initial_backoff,
})
}
}
#[derive(Debug, Clone)]
pub struct BrokkrClient {
inner: Client,
max_retries: u32,
initial_backoff: Duration,
}
impl BrokkrClient {
pub fn builder(base_url: impl Into<String>) -> BrokkrClientBuilder {
BrokkrClientBuilder::new(base_url)
}
pub fn api(&self) -> &Client {
&self.inner
}
pub async fn list_telemetry_events(
&self,
stack_id: Uuid,
since: Option<DateTime<Utc>>,
limit: Option<i64>,
) -> Result<K8sEventHistoryResponse, BrokkrError> {
let mut req = self.inner.list_telemetry_events().id(stack_id);
if let Some(since) = since {
req = req.since(since);
}
if let Some(limit) = limit {
req = req.limit(limit);
}
let resp = req.send().await?;
Ok(resp.into_inner())
}
pub async fn list_telemetry_logs(
&self,
stack_id: Uuid,
since: Option<DateTime<Utc>>,
limit: Option<i64>,
) -> Result<PodLogHistoryResponse, BrokkrError> {
let mut req = self.inner.list_telemetry_logs().id(stack_id);
if let Some(since) = since {
req = req.since(since);
}
if let Some(limit) = limit {
req = req.limit(limit);
}
let resp = req.send().await?;
Ok(resp.into_inner())
}
pub async fn list_ws_connections(&self) -> Result<WsConnectionsResponse, BrokkrError> {
let resp = self.inner.list_ws_connections().send().await?;
Ok(resp.into_inner())
}
pub async fn submit_manifests(
&self,
stack_id: Uuid,
path: impl AsRef<Path>,
) -> Result<DeploymentObject, BrokkrError> {
let yaml_content = read_manifests(path.as_ref())?;
let resp = self
.inner
.create_deployment_object()
.id(stack_id)
.body(CreateDeploymentObjectRequest {
yaml_content,
is_deletion_marker: Some(false),
})
.send()
.await?;
Ok(resp.into_inner())
}
pub async fn apply(
&self,
stack_name: &str,
path: impl AsRef<Path>,
targeting: &[String],
) -> Result<ApplyOutcome, BrokkrError> {
let yaml_content = read_manifests(path.as_ref())?;
let checksum = sha256_hex(&yaml_content);
let auth = self.inner.verify_pak().send().await?.into_inner();
let generator_id = auth
.generator
.ok_or_else(|| {
BrokkrError::InvalidRequest(
"apply by name requires a generator PAK; admin callers should create the \
stack explicitly and use submit_manifests"
.to_string(),
)
})
.and_then(|g| {
Uuid::parse_str(&g).map_err(|e| BrokkrError::UnexpectedResponse {
status: None,
detail: format!("auth response generator id is not a UUID: {e}"),
})
})?;
let stacks: Vec<Stack> = self.inner.list_stacks().send().await?.into_inner();
let stack = match stacks.into_iter().find(|s| s.name == stack_name) {
Some(s) => s,
None => self
.inner
.create_stack()
.body(NewStack {
name: stack_name.to_string(),
generator_id,
description: None,
})
.send()
.await?
.into_inner(),
};
for label in targeting {
if let Err(e) = self
.inner
.stacks_add_label()
.id(stack.id)
.body(label.clone())
.send()
.await
{
let err = BrokkrError::from(e);
if err.status() != Some(reqwest::StatusCode::CONFLICT) {
return Err(err);
}
}
}
let objects: Vec<DeploymentObject> = self
.inner
.list_deployment_objects()
.id(stack.id)
.send()
.await?
.into_inner();
let had_prior = !objects.is_empty();
let already_current = objects
.iter()
.max_by_key(|o| o.sequence_id)
.map(|latest| latest.yaml_checksum == checksum)
.unwrap_or(false);
if already_current {
return Ok(ApplyOutcome::Unchanged);
}
let object = self
.inner
.create_deployment_object()
.id(stack.id)
.body(CreateDeploymentObjectRequest {
yaml_content,
is_deletion_marker: Some(false),
})
.send()
.await?
.into_inner();
Ok(if had_prior {
ApplyOutcome::Updated(object)
} else {
ApplyOutcome::Created(object)
})
}
pub async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, BrokkrError>
where
F: FnMut(&Client) -> Fut,
Fut: std::future::Future<Output = Result<T, BrokkrError>>,
{
let mut attempt: u32 = 0;
loop {
match op(&self.inner).await {
Ok(value) => return Ok(value),
Err(err) if !err.is_retryable() || attempt >= self.max_retries => {
return Err(err);
}
Err(_) => {
let backoff = self
.initial_backoff
.saturating_mul(1u32 << attempt)
.min(Duration::from_secs(10));
tokio::time::sleep(backoff).await;
attempt += 1;
}
}
}
}
}
#[derive(Debug)]
pub enum ApplyOutcome {
Created(DeploymentObject),
Updated(DeploymentObject),
Unchanged,
}
fn read_manifests(path: &Path) -> Result<String, BrokkrError> {
let files = collect_manifest_files(path)?;
if files.is_empty() {
return Err(BrokkrError::InvalidRequest(format!(
"no .yaml/.yml manifests found in {}",
path.display()
)));
}
let mut parts: Vec<String> = Vec::with_capacity(files.len());
for file in &files {
let content = std::fs::read_to_string(file).map_err(|e| {
BrokkrError::InvalidRequest(format!("cannot read {}: {e}", file.display()))
})?;
validate_manifest_documents(&content, file)?;
parts.push(content.trim_end().to_string());
}
Ok(format!("{}\n", parts.join("\n---\n")))
}
fn collect_manifest_files(path: &Path) -> Result<Vec<std::path::PathBuf>, BrokkrError> {
if path.is_file() {
return Ok(vec![path.to_path_buf()]);
}
if !path.is_dir() {
return Err(BrokkrError::InvalidRequest(format!(
"path not found: {}",
path.display()
)));
}
let mut files: Vec<std::path::PathBuf> = std::fs::read_dir(path)
.map_err(|e| {
BrokkrError::InvalidRequest(format!("cannot read directory {}: {e}", path.display()))
})?
.filter_map(|entry| entry.ok().map(|e| e.path()))
.filter(|p| {
p.is_file()
&& matches!(
p.extension().and_then(|s| s.to_str()),
Some("yaml") | Some("yml")
)
})
.collect();
files.sort();
Ok(files)
}
fn validate_manifest_documents(content: &str, file: &Path) -> Result<(), BrokkrError> {
use serde::Deserialize;
for doc in serde_yaml::Deserializer::from_str(content) {
let value = serde_yaml::Value::deserialize(doc).map_err(|e| {
BrokkrError::InvalidRequest(format!("{}: invalid YAML: {e}", file.display()))
})?;
if value.is_null() {
continue;
}
let has = |key: &str| value.get(key).and_then(|v| v.as_str()).is_some();
if !has("apiVersion") || !has("kind") {
return Err(BrokkrError::InvalidRequest(format!(
"{}: every manifest document must have apiVersion and kind",
file.display()
)));
}
}
Ok(())
}
fn sha256_hex(content: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_constructs_without_token() {
use progenitor_client::ClientInfo;
let c = BrokkrClient::builder("http://localhost:3000/api/v1")
.build()
.expect("builder should succeed");
assert_eq!(c.api().baseurl(), "http://localhost:3000/api/v1");
}
#[test]
fn builder_accepts_token_and_timeouts() {
let c = BrokkrClient::builder("http://localhost:3000/api/v1")
.token("bk_admin_test_token")
.request_timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(2))
.max_retries(5)
.initial_backoff(Duration::from_millis(50))
.build()
.expect("builder should succeed");
assert_eq!(c.max_retries, 5);
assert_eq!(c.initial_backoff, Duration::from_millis(50));
}
#[test]
fn invalid_token_header_is_rejected() {
let result = BrokkrClient::builder("http://localhost:3000/api/v1")
.token("invalid\nheader\rvalue")
.build();
assert!(matches!(result, Err(BrokkrError::InvalidRequest(_))));
}
#[test]
fn error_code_extracted_from_api_response() {
let err = BrokkrError::Api(
ErrorResponse {
code: "agent_not_found".to_string(),
message: "agent not found".to_string(),
details: None,
},
reqwest::StatusCode::NOT_FOUND,
);
assert_eq!(err.code(), Some("agent_not_found"));
assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
assert!(!err.is_retryable());
}
#[test]
fn retryable_classification() {
for status in [408u16, 429, 502, 503, 504] {
let err = BrokkrError::Api(
ErrorResponse {
code: "transient".to_string(),
message: "x".to_string(),
details: None,
},
reqwest::StatusCode::from_u16(status).unwrap(),
);
assert!(err.is_retryable(), "{status} should be retryable");
}
for status in [400u16, 401, 403, 404, 409, 422, 500, 501] {
let err = BrokkrError::Api(
ErrorResponse {
code: "non_transient".to_string(),
message: "x".to_string(),
details: None,
},
reqwest::StatusCode::from_u16(status).unwrap(),
);
assert!(!err.is_retryable(), "{status} should NOT be retryable");
}
}
#[tokio::test(start_paused = true)]
async fn retry_stops_after_max_attempts() {
let client = BrokkrClient::builder("http://localhost:3000/api/v1")
.max_retries(2)
.initial_backoff(Duration::from_millis(1))
.build()
.unwrap();
let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let calls_clone = calls.clone();
let result: Result<(), BrokkrError> = client
.retry(|_| {
let calls = calls_clone.clone();
async move {
calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err(BrokkrError::Api(
ErrorResponse {
code: "transient".to_string(),
message: "service unavailable".to_string(),
details: None,
},
reqwest::StatusCode::SERVICE_UNAVAILABLE,
))
}
})
.await;
assert!(result.is_err());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
}
#[test]
fn ws_wrapper_methods_compile_with_expected_signatures() {
fn _assert_signatures() {
async fn _types_check() {
let c = BrokkrClient::builder("http://localhost:3000/api/v1")
.build()
.unwrap();
let id = uuid::Uuid::nil();
let _ev: K8sEventHistoryResponse =
c.list_telemetry_events(id, None, None).await.unwrap();
let _lo: PodLogHistoryResponse =
c.list_telemetry_logs(id, None, Some(100)).await.unwrap();
let _co: WsConnectionsResponse = c.list_ws_connections().await.unwrap();
}
let _ = _types_check;
}
}
#[tokio::test(start_paused = true)]
async fn retry_returns_immediately_on_non_retryable() {
let client = BrokkrClient::builder("http://localhost:3000/api/v1")
.max_retries(5)
.build()
.unwrap();
let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let calls_clone = calls.clone();
let result: Result<(), BrokkrError> = client
.retry(|_| {
let calls = calls_clone.clone();
async move {
calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err(BrokkrError::Api(
ErrorResponse {
code: "agent_not_found".to_string(),
message: "x".to_string(),
details: None,
},
reqwest::StatusCode::NOT_FOUND,
))
}
})
.await;
assert!(result.is_err());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
}
fn write(dir: &std::path::Path, name: &str, content: &str) {
std::fs::write(dir.join(name), content).unwrap();
}
#[test]
fn read_manifests_concatenates_folder_in_sorted_order() {
let dir = tempfile::tempdir().unwrap();
write(dir.path(), "02-deploy.yaml", "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: d\n");
write(dir.path(), "01-namespace.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n name: ns\n");
write(dir.path(), "notes.txt", "ignored");
let stream = read_manifests(dir.path()).unwrap();
let ns_at = stream.find("kind: Namespace").unwrap();
let dep_at = stream.find("kind: Deployment").unwrap();
assert!(ns_at < dep_at, "01-namespace should come before 02-deploy");
assert!(stream.contains("\n---\n"), "documents joined with a separator");
assert!(!stream.contains("ignored"), "non-yaml files are skipped");
}
#[test]
fn read_manifests_accepts_single_file_and_multidoc() {
let dir = tempfile::tempdir().unwrap();
write(dir.path(), "all.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n name: a\n---\napiVersion: v1\nkind: ConfigMap\nmetadata:\n name: b\n");
let stream = read_manifests(&dir.path().join("all.yaml")).unwrap();
assert!(stream.contains("kind: Namespace") && stream.contains("kind: ConfigMap"));
}
#[test]
fn read_manifests_rejects_missing_apiversion_or_kind() {
let dir = tempfile::tempdir().unwrap();
write(dir.path(), "bad.yaml", "kind: ConfigMap\nmetadata:\n name: x\n");
let err = read_manifests(dir.path()).unwrap_err();
assert!(matches!(err, BrokkrError::InvalidRequest(_)), "got {err:?}");
}
#[test]
fn read_manifests_rejects_malformed_yaml() {
let dir = tempfile::tempdir().unwrap();
write(dir.path(), "bad.yaml", "kind: : : [unbalanced");
assert!(read_manifests(dir.path()).is_err());
}
#[test]
fn read_manifests_errors_on_empty_dir_and_missing_path() {
let dir = tempfile::tempdir().unwrap();
assert!(read_manifests(dir.path()).is_err(), "empty dir");
assert!(read_manifests(&dir.path().join("nope")).is_err(), "missing path");
}
#[test]
fn sha256_hex_is_stable_and_matches_known_vector() {
assert_eq!(
sha256_hex(""),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
let a = "apiVersion: v1\nkind: ConfigMap\n";
assert_eq!(sha256_hex(a), sha256_hex(a));
assert_ne!(sha256_hex(a), sha256_hex("apiVersion: v1\nkind: Secret\n"));
}
}