1use std::time::Duration;
34
35use progenitor_client::Error as RawError;
36use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
37
38use crate::Client;
39use crate::types::{
40 CreateDeploymentObjectRequest, DeploymentObject, ErrorResponse, K8sEventHistoryResponse,
41 NewStack, PodLogHistoryResponse, Stack, WsConnectionsResponse,
42};
43use chrono::{DateTime, Utc};
44use std::path::Path;
45use uuid::Uuid;
46
47#[derive(Debug)]
51pub enum BrokkrError {
52 Api(ErrorResponse, reqwest::StatusCode),
55 Transport(reqwest::Error),
57 UnexpectedResponse {
60 status: Option<reqwest::StatusCode>,
61 detail: String,
62 },
63 InvalidRequest(String),
65}
66
67impl BrokkrError {
68 pub fn status(&self) -> Option<reqwest::StatusCode> {
70 match self {
71 Self::Api(_, status) => Some(*status),
72 Self::Transport(e) => e.status(),
73 Self::UnexpectedResponse { status, .. } => *status,
74 Self::InvalidRequest(_) => None,
75 }
76 }
77
78 pub fn code(&self) -> Option<&str> {
81 match self {
82 Self::Api(body, _) => Some(&body.code),
83 _ => None,
84 }
85 }
86
87 pub fn is_retryable(&self) -> bool {
91 match self {
92 Self::Transport(_) => true,
93 Self::Api(_, status) => is_retryable_status(*status),
94 Self::UnexpectedResponse {
95 status: Some(status),
96 ..
97 } => is_retryable_status(*status),
98 _ => false,
99 }
100 }
101}
102
103impl std::fmt::Display for BrokkrError {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 match self {
106 Self::Api(body, status) => {
107 write!(f, "{} {}: {}", status.as_u16(), body.code, body.message)
108 }
109 Self::Transport(e) => write!(f, "transport error: {e}"),
110 Self::UnexpectedResponse { status, detail } => match status {
111 Some(s) => write!(f, "unexpected response ({}): {}", s.as_u16(), detail),
112 None => write!(f, "unexpected response: {detail}"),
113 },
114 Self::InvalidRequest(msg) => write!(f, "invalid request: {msg}"),
115 }
116 }
117}
118
119impl std::error::Error for BrokkrError {}
120
121impl From<RawError<ErrorResponse>> for BrokkrError {
122 fn from(err: RawError<ErrorResponse>) -> Self {
123 match err {
124 RawError::ErrorResponse(rv) => {
125 let status = rv.status();
126 Self::Api(rv.into_inner(), status)
127 }
128 RawError::CommunicationError(e)
129 | RawError::InvalidUpgrade(e)
130 | RawError::ResponseBodyError(e) => Self::Transport(e),
131 RawError::InvalidRequest(msg) => Self::InvalidRequest(msg),
132 RawError::InvalidResponsePayload(bytes, e) => Self::UnexpectedResponse {
133 status: None,
134 detail: format!(
135 "payload deserialization failed: {e} ({} bytes)",
136 bytes.len()
137 ),
138 },
139 RawError::UnexpectedResponse(resp) => Self::UnexpectedResponse {
140 status: Some(resp.status()),
141 detail: "response not described in OpenAPI spec".to_string(),
142 },
143 RawError::Custom(s) => Self::InvalidRequest(s),
144 }
145 }
146}
147
148fn is_retryable_status(status: reqwest::StatusCode) -> bool {
149 matches!(status.as_u16(), 408 | 429 | 502 | 503 | 504)
150}
151
152#[derive(Debug)]
154pub struct BrokkrClientBuilder {
155 base_url: String,
156 token: Option<String>,
157 request_timeout: Duration,
158 connect_timeout: Duration,
159 max_retries: u32,
160 initial_backoff: Duration,
161}
162
163impl BrokkrClientBuilder {
164 fn new(base_url: impl Into<String>) -> Self {
165 Self {
166 base_url: base_url.into(),
167 token: None,
168 request_timeout: Duration::from_secs(30),
169 connect_timeout: Duration::from_secs(10),
170 max_retries: 3,
171 initial_backoff: Duration::from_millis(200),
172 }
173 }
174
175 pub fn token(mut self, token: impl Into<String>) -> Self {
179 self.token = Some(token.into());
180 self
181 }
182
183 pub fn request_timeout(mut self, timeout: Duration) -> Self {
185 self.request_timeout = timeout;
186 self
187 }
188
189 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
191 self.connect_timeout = timeout;
192 self
193 }
194
195 pub fn max_retries(mut self, max: u32) -> Self {
198 self.max_retries = max;
199 self
200 }
201
202 pub fn initial_backoff(mut self, initial: Duration) -> Self {
205 self.initial_backoff = initial;
206 self
207 }
208
209 pub fn build(self) -> Result<BrokkrClient, BrokkrError> {
210 let mut headers = HeaderMap::new();
211 if let Some(token) = &self.token {
212 let value = HeaderValue::from_str(token).map_err(|e| {
213 BrokkrError::InvalidRequest(format!("invalid token header value: {e}"))
214 })?;
215 headers.insert(AUTHORIZATION, value);
216 }
217
218 let reqwest_client = reqwest::Client::builder()
219 .default_headers(headers)
220 .connect_timeout(self.connect_timeout)
221 .timeout(self.request_timeout)
222 .build()
223 .map_err(BrokkrError::Transport)?;
224
225 let inner = Client::new_with_client(&self.base_url, reqwest_client);
226 Ok(BrokkrClient {
227 inner,
228 max_retries: self.max_retries,
229 initial_backoff: self.initial_backoff,
230 })
231 }
232}
233
234#[derive(Debug, Clone)]
240pub struct BrokkrClient {
241 inner: Client,
242 max_retries: u32,
243 initial_backoff: Duration,
244}
245
246impl BrokkrClient {
247 pub fn builder(base_url: impl Into<String>) -> BrokkrClientBuilder {
250 BrokkrClientBuilder::new(base_url)
251 }
252
253 pub fn api(&self) -> &Client {
257 &self.inner
258 }
259
260 pub async fn list_telemetry_events(
274 &self,
275 stack_id: Uuid,
276 since: Option<DateTime<Utc>>,
277 limit: Option<i64>,
278 ) -> Result<K8sEventHistoryResponse, BrokkrError> {
279 let mut req = self.inner.list_telemetry_events().id(stack_id);
280 if let Some(since) = since {
281 req = req.since(since);
282 }
283 if let Some(limit) = limit {
284 req = req.limit(limit);
285 }
286 let resp = req.send().await?;
287 Ok(resp.into_inner())
288 }
289
290 pub async fn list_telemetry_logs(
294 &self,
295 stack_id: Uuid,
296 since: Option<DateTime<Utc>>,
297 limit: Option<i64>,
298 ) -> Result<PodLogHistoryResponse, BrokkrError> {
299 let mut req = self.inner.list_telemetry_logs().id(stack_id);
300 if let Some(since) = since {
301 req = req.since(since);
302 }
303 if let Some(limit) = limit {
304 req = req.limit(limit);
305 }
306 let resp = req.send().await?;
307 Ok(resp.into_inner())
308 }
309
310 pub async fn list_ws_connections(&self) -> Result<WsConnectionsResponse, BrokkrError> {
315 let resp = self.inner.list_ws_connections().send().await?;
316 Ok(resp.into_inner())
317 }
318
319 pub async fn submit_manifests(
338 &self,
339 stack_id: Uuid,
340 path: impl AsRef<Path>,
341 ) -> Result<DeploymentObject, BrokkrError> {
342 let yaml_content = read_manifests(path.as_ref())?;
343 let resp = self
344 .inner
345 .create_deployment_object()
346 .id(stack_id)
347 .body(CreateDeploymentObjectRequest {
348 yaml_content,
349 is_deletion_marker: Some(false),
350 })
351 .send()
352 .await?;
353 Ok(resp.into_inner())
354 }
355
356 pub async fn apply(
366 &self,
367 stack_name: &str,
368 path: impl AsRef<Path>,
369 targeting: &[String],
370 ) -> Result<ApplyOutcome, BrokkrError> {
371 let yaml_content = read_manifests(path.as_ref())?;
372 let checksum = sha256_hex(&yaml_content);
373
374 let auth = self.inner.verify_pak().send().await?.into_inner();
376 let generator_id = auth
377 .generator
378 .ok_or_else(|| {
379 BrokkrError::InvalidRequest(
380 "apply by name requires a generator PAK; admin callers should create the \
381 stack explicitly and use submit_manifests"
382 .to_string(),
383 )
384 })
385 .and_then(|g| {
386 Uuid::parse_str(&g).map_err(|e| BrokkrError::UnexpectedResponse {
387 status: None,
388 detail: format!("auth response generator id is not a UUID: {e}"),
389 })
390 })?;
391
392 let stacks: Vec<Stack> = self.inner.list_stacks().send().await?.into_inner();
394 let stack = match stacks.into_iter().find(|s| s.name == stack_name) {
395 Some(s) => s,
396 None => self
397 .inner
398 .create_stack()
399 .body(NewStack {
400 name: stack_name.to_string(),
401 generator_id,
402 description: None,
403 })
404 .send()
405 .await?
406 .into_inner(),
407 };
408
409 for label in targeting {
411 if let Err(e) = self
412 .inner
413 .stacks_add_label()
414 .id(stack.id)
415 .body(label.clone())
416 .send()
417 .await
418 {
419 let err = BrokkrError::from(e);
420 if err.status() != Some(reqwest::StatusCode::CONFLICT) {
421 return Err(err);
422 }
423 }
424 }
425
426 let objects: Vec<DeploymentObject> = self
428 .inner
429 .list_deployment_objects()
430 .id(stack.id)
431 .send()
432 .await?
433 .into_inner();
434 let had_prior = !objects.is_empty();
435 let already_current = objects
436 .iter()
437 .max_by_key(|o| o.sequence_id)
438 .map(|latest| latest.yaml_checksum == checksum)
439 .unwrap_or(false);
440 if already_current {
441 return Ok(ApplyOutcome::Unchanged);
442 }
443
444 let object = self
445 .inner
446 .create_deployment_object()
447 .id(stack.id)
448 .body(CreateDeploymentObjectRequest {
449 yaml_content,
450 is_deletion_marker: Some(false),
451 })
452 .send()
453 .await?
454 .into_inner();
455
456 Ok(if had_prior {
457 ApplyOutcome::Updated(object)
458 } else {
459 ApplyOutcome::Created(object)
460 })
461 }
462
463 pub async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, BrokkrError>
473 where
474 F: FnMut(&Client) -> Fut,
475 Fut: std::future::Future<Output = Result<T, BrokkrError>>,
476 {
477 let mut attempt: u32 = 0;
478 loop {
479 match op(&self.inner).await {
480 Ok(value) => return Ok(value),
481 Err(err) if !err.is_retryable() || attempt >= self.max_retries => {
482 return Err(err);
483 }
484 Err(_) => {
485 let backoff = self
486 .initial_backoff
487 .saturating_mul(1u32 << attempt)
488 .min(Duration::from_secs(10));
489 tokio::time::sleep(backoff).await;
490 attempt += 1;
491 }
492 }
493 }
494 }
495}
496
497#[derive(Debug)]
499pub enum ApplyOutcome {
500 Created(DeploymentObject),
502 Updated(DeploymentObject),
504 Unchanged,
506}
507
508fn read_manifests(path: &Path) -> Result<String, BrokkrError> {
514 let files = collect_manifest_files(path)?;
515 if files.is_empty() {
516 return Err(BrokkrError::InvalidRequest(format!(
517 "no .yaml/.yml manifests found in {}",
518 path.display()
519 )));
520 }
521 let mut parts: Vec<String> = Vec::with_capacity(files.len());
522 for file in &files {
523 let content = std::fs::read_to_string(file).map_err(|e| {
524 BrokkrError::InvalidRequest(format!("cannot read {}: {e}", file.display()))
525 })?;
526 validate_manifest_documents(&content, file)?;
527 parts.push(content.trim_end().to_string());
528 }
529 Ok(format!("{}\n", parts.join("\n---\n")))
530}
531
532fn collect_manifest_files(path: &Path) -> Result<Vec<std::path::PathBuf>, BrokkrError> {
534 if path.is_file() {
535 return Ok(vec![path.to_path_buf()]);
536 }
537 if !path.is_dir() {
538 return Err(BrokkrError::InvalidRequest(format!(
539 "path not found: {}",
540 path.display()
541 )));
542 }
543 let mut files: Vec<std::path::PathBuf> = std::fs::read_dir(path)
544 .map_err(|e| {
545 BrokkrError::InvalidRequest(format!("cannot read directory {}: {e}", path.display()))
546 })?
547 .filter_map(|entry| entry.ok().map(|e| e.path()))
548 .filter(|p| {
549 p.is_file()
550 && matches!(
551 p.extension().and_then(|s| s.to_str()),
552 Some("yaml") | Some("yml")
553 )
554 })
555 .collect();
556 files.sort();
557 Ok(files)
558}
559
560fn validate_manifest_documents(content: &str, file: &Path) -> Result<(), BrokkrError> {
563 use serde::Deserialize;
564 for doc in serde_yaml::Deserializer::from_str(content) {
565 let value = serde_yaml::Value::deserialize(doc).map_err(|e| {
566 BrokkrError::InvalidRequest(format!("{}: invalid YAML: {e}", file.display()))
567 })?;
568 if value.is_null() {
569 continue;
570 }
571 let has = |key: &str| value.get(key).and_then(|v| v.as_str()).is_some();
572 if !has("apiVersion") || !has("kind") {
573 return Err(BrokkrError::InvalidRequest(format!(
574 "{}: every manifest document must have apiVersion and kind",
575 file.display()
576 )));
577 }
578 }
579 Ok(())
580}
581
582fn sha256_hex(content: &str) -> String {
585 use sha2::{Digest, Sha256};
586 let mut hasher = Sha256::new();
587 hasher.update(content.as_bytes());
588 format!("{:x}", hasher.finalize())
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn builder_constructs_without_token() {
597 use progenitor_client::ClientInfo;
598 let c = BrokkrClient::builder("http://localhost:3000/api/v1")
599 .build()
600 .expect("builder should succeed");
601 assert_eq!(c.api().baseurl(), "http://localhost:3000/api/v1");
602 }
603
604 #[test]
605 fn builder_accepts_token_and_timeouts() {
606 let c = BrokkrClient::builder("http://localhost:3000/api/v1")
607 .token("bk_admin_test_token")
608 .request_timeout(Duration::from_secs(5))
609 .connect_timeout(Duration::from_secs(2))
610 .max_retries(5)
611 .initial_backoff(Duration::from_millis(50))
612 .build()
613 .expect("builder should succeed");
614 assert_eq!(c.max_retries, 5);
615 assert_eq!(c.initial_backoff, Duration::from_millis(50));
616 }
617
618 #[test]
619 fn invalid_token_header_is_rejected() {
620 let result = BrokkrClient::builder("http://localhost:3000/api/v1")
621 .token("invalid\nheader\rvalue")
622 .build();
623 assert!(matches!(result, Err(BrokkrError::InvalidRequest(_))));
624 }
625
626 #[test]
627 fn error_code_extracted_from_api_response() {
628 let err = BrokkrError::Api(
629 ErrorResponse {
630 code: "agent_not_found".to_string(),
631 message: "agent not found".to_string(),
632 details: None,
633 },
634 reqwest::StatusCode::NOT_FOUND,
635 );
636 assert_eq!(err.code(), Some("agent_not_found"));
637 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
638 assert!(!err.is_retryable());
639 }
640
641 #[test]
642 fn retryable_classification() {
643 for status in [408u16, 429, 502, 503, 504] {
644 let err = BrokkrError::Api(
645 ErrorResponse {
646 code: "transient".to_string(),
647 message: "x".to_string(),
648 details: None,
649 },
650 reqwest::StatusCode::from_u16(status).unwrap(),
651 );
652 assert!(err.is_retryable(), "{status} should be retryable");
653 }
654 for status in [400u16, 401, 403, 404, 409, 422, 500, 501] {
655 let err = BrokkrError::Api(
656 ErrorResponse {
657 code: "non_transient".to_string(),
658 message: "x".to_string(),
659 details: None,
660 },
661 reqwest::StatusCode::from_u16(status).unwrap(),
662 );
663 assert!(!err.is_retryable(), "{status} should NOT be retryable");
664 }
665 }
666
667 #[tokio::test(start_paused = true)]
668 async fn retry_stops_after_max_attempts() {
669 let client = BrokkrClient::builder("http://localhost:3000/api/v1")
670 .max_retries(2)
671 .initial_backoff(Duration::from_millis(1))
672 .build()
673 .unwrap();
674 let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
675 let calls_clone = calls.clone();
676 let result: Result<(), BrokkrError> = client
677 .retry(|_| {
678 let calls = calls_clone.clone();
679 async move {
680 calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
681 Err(BrokkrError::Api(
683 ErrorResponse {
684 code: "transient".to_string(),
685 message: "service unavailable".to_string(),
686 details: None,
687 },
688 reqwest::StatusCode::SERVICE_UNAVAILABLE,
689 ))
690 }
691 })
692 .await;
693 assert!(result.is_err());
694 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
696 }
697
698 #[test]
707 fn ws_wrapper_methods_compile_with_expected_signatures() {
708 fn _assert_signatures() {
709 async fn _types_check() {
710 let c = BrokkrClient::builder("http://localhost:3000/api/v1")
711 .build()
712 .unwrap();
713 let id = uuid::Uuid::nil();
714 let _ev: K8sEventHistoryResponse =
715 c.list_telemetry_events(id, None, None).await.unwrap();
716 let _lo: PodLogHistoryResponse =
717 c.list_telemetry_logs(id, None, Some(100)).await.unwrap();
718 let _co: WsConnectionsResponse = c.list_ws_connections().await.unwrap();
719 }
720 let _ = _types_check;
721 }
722 }
723
724 #[tokio::test(start_paused = true)]
725 async fn retry_returns_immediately_on_non_retryable() {
726 let client = BrokkrClient::builder("http://localhost:3000/api/v1")
727 .max_retries(5)
728 .build()
729 .unwrap();
730 let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
731 let calls_clone = calls.clone();
732 let result: Result<(), BrokkrError> = client
733 .retry(|_| {
734 let calls = calls_clone.clone();
735 async move {
736 calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
737 Err(BrokkrError::Api(
738 ErrorResponse {
739 code: "agent_not_found".to_string(),
740 message: "x".to_string(),
741 details: None,
742 },
743 reqwest::StatusCode::NOT_FOUND,
744 ))
745 }
746 })
747 .await;
748 assert!(result.is_err());
749 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
750 }
751
752 fn write(dir: &std::path::Path, name: &str, content: &str) {
755 std::fs::write(dir.join(name), content).unwrap();
756 }
757
758 #[test]
759 fn read_manifests_concatenates_folder_in_sorted_order() {
760 let dir = tempfile::tempdir().unwrap();
761 write(dir.path(), "02-deploy.yaml", "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: d\n");
763 write(dir.path(), "01-namespace.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n name: ns\n");
764 write(dir.path(), "notes.txt", "ignored");
765 let stream = read_manifests(dir.path()).unwrap();
766 let ns_at = stream.find("kind: Namespace").unwrap();
767 let dep_at = stream.find("kind: Deployment").unwrap();
768 assert!(ns_at < dep_at, "01-namespace should come before 02-deploy");
769 assert!(stream.contains("\n---\n"), "documents joined with a separator");
770 assert!(!stream.contains("ignored"), "non-yaml files are skipped");
771 }
772
773 #[test]
774 fn read_manifests_accepts_single_file_and_multidoc() {
775 let dir = tempfile::tempdir().unwrap();
776 write(dir.path(), "all.yaml", "apiVersion: v1\nkind: Namespace\nmetadata:\n name: a\n---\napiVersion: v1\nkind: ConfigMap\nmetadata:\n name: b\n");
777 let stream = read_manifests(&dir.path().join("all.yaml")).unwrap();
778 assert!(stream.contains("kind: Namespace") && stream.contains("kind: ConfigMap"));
779 }
780
781 #[test]
782 fn read_manifests_rejects_missing_apiversion_or_kind() {
783 let dir = tempfile::tempdir().unwrap();
784 write(dir.path(), "bad.yaml", "kind: ConfigMap\nmetadata:\n name: x\n");
785 let err = read_manifests(dir.path()).unwrap_err();
786 assert!(matches!(err, BrokkrError::InvalidRequest(_)), "got {err:?}");
787 }
788
789 #[test]
790 fn read_manifests_rejects_malformed_yaml() {
791 let dir = tempfile::tempdir().unwrap();
792 write(dir.path(), "bad.yaml", "kind: : : [unbalanced");
793 assert!(read_manifests(dir.path()).is_err());
794 }
795
796 #[test]
797 fn read_manifests_errors_on_empty_dir_and_missing_path() {
798 let dir = tempfile::tempdir().unwrap();
799 assert!(read_manifests(dir.path()).is_err(), "empty dir");
800 assert!(read_manifests(&dir.path().join("nope")).is_err(), "missing path");
801 }
802
803 #[test]
804 fn sha256_hex_is_stable_and_matches_known_vector() {
805 assert_eq!(
807 sha256_hex(""),
808 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
809 );
810 let a = "apiVersion: v1\nkind: ConfigMap\n";
811 assert_eq!(sha256_hex(a), sha256_hex(a));
812 assert_ne!(sha256_hex(a), sha256_hex("apiVersion: v1\nkind: Secret\n"));
813 }
814
815}